abdullah alamoudi has submitted this change and it was merged. Change subject: [ASTERIXDB-2231][STO] Separate primary op tracker for each partition ......................................................................
[ASTERIXDB-2231][STO] Separate primary op tracker for each partition - user model changes: no - storage format changes: no. - interface changes: yes. Details: - Separate primary index operation tracker for each partition, instead of having a global one on each NC to achieve better scalability. - As a coordinated change, separate component id generator for each partition as well. - Add partition to transaction context so that transaction operations can operate on proper op tracker. - Fixes [ASTERIXDB-2232] to calculate dataset partitions correctly. Change-Id: I9eb3854d2343e45beeccb87b0d434e5f4efd69c9 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2263 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java 63 files changed, 407 insertions(+), 269 deletions(-) Approvals: Anon. E. Moose #1000171: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index c554cbd..366438a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -109,15 +109,15 @@ private ILSMMergePolicyFactory metadataMergePolicyFactory; private final INCServiceContext ncServiceContext; private final IResourceIdFactory resourceIdFactory; - private CompilerProperties compilerProperties; - private ExternalProperties externalProperties; - private MetadataProperties metadataProperties; - private StorageProperties storageProperties; - private TransactionProperties txnProperties; - private ActiveProperties activeProperties; - private BuildProperties buildProperties; - private ReplicationProperties replicationProperties; - private MessagingProperties messagingProperties; + private final CompilerProperties compilerProperties; + private final ExternalProperties externalProperties; + private final MetadataProperties metadataProperties; + private final StorageProperties storageProperties; + private final TransactionProperties txnProperties; + private final ActiveProperties activeProperties; + private final BuildProperties buildProperties; + private final ReplicationProperties replicationProperties; + private final MessagingProperties messagingProperties; private final NodeProperties nodeProperties; private ExecutorService threadExecutor; private IDatasetMemoryManager datasetMemoryManager; @@ -373,8 +373,8 @@ } @Override - public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) { - return datasetLifecycleManager.getOperationTracker(datasetID); + public ILSMOperationTracker getPrimaryOperationTracker(int datasetID, int partition) { + return datasetLifecycleManager.getOperationTracker(datasetID, partition); } @Override diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index a46b029..c6232f5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -144,7 +144,7 @@ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents(memoryComponentsPredicate); @@ -153,7 +153,7 @@ // rollback the last disk component lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); @@ -201,7 +201,7 @@ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents(memoryComponentsPredicate); @@ -227,7 +227,7 @@ // rollback the last disk component lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); @@ -276,7 +276,7 @@ firstSearcher.waitUntilEntered(); // now that we enetered, we will rollback ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents( @@ -298,7 +298,7 @@ lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); @@ -702,7 +702,7 @@ public void run() { ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); try { - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); lsmAccessor.deleteComponents(predicate); } catch (HyracksDataException e) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java index 367d0b9..62705cc 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java @@ -251,7 +251,7 @@ /** * This test update partition 0, schedule flush and modify partition 1 - * Then ensure that in partition 1, primary and secondary have different component ids + * Then ensure that in partition 1, primary and secondary have the same component ids */ @Test public void testAllocateWhileFlushIsScheduled() { @@ -400,7 +400,8 @@ AtomicBoolean arrivedAtSchduleFlush = new AtomicBoolean(false); AtomicBoolean finishedSchduleFlush = new AtomicBoolean(false); MutableBoolean proceedToScheduleFlush = new MutableBoolean(false); - addOpTrackerCallback(primaryLsmBtrees[0], new ITestOpCallback<Void>() { + // keep track of the flush of partition 1 since partitions 0 and 1 are flushed seperately + addOpTrackerCallback(primaryLsmBtrees[1], new ITestOpCallback<Void>() { @Override public void before(Void t) { synchronized (arrivedAtSchduleFlush) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java index 4bfc581..c69ffe5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java @@ -22,7 +22,6 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.asterix.common.storage.IIndexCheckpointManager; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; @@ -52,7 +51,7 @@ } @Override - public synchronized ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { + public synchronized ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { completedFlushes = 0; completedMerges = 0; rollbackFlushes = 0; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java index e376ff9..9a528d3 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java @@ -33,9 +33,9 @@ private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>(); - public TestPrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo, + public TestPrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo, ILSMComponentIdGenerator idGenerator) { - super(datasetID, logManager, dsInfo, idGenerator); + super(datasetID, partition, logManager, dsInfo, idGenerator); } public void addCallback(ITestOpCallback<Void> callback) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java index 5d7a7c6..e6b34b8 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java @@ -20,19 +20,22 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.Map; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.context.DatasetResource; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; +import org.apache.hyracks.storage.common.IResource; public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperationTrackerFactory { private static final long serialVersionUID = 1L; - private int datasetId; + private final int datasetId; public TestPrimaryIndexOperationTrackerFactory(int datasetId) { super(datasetId); @@ -40,17 +43,19 @@ } @Override - public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) { + public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) { try { INcApplicationContext appCtx = (INcApplicationContext) ctx.getApplicationContext(); DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager(); DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId); - PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); + int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); + PrimaryIndexOperationTracker opTracker = dslcManager.getOperationTracker(datasetId, partition); if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) { - Field opTrackerField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTracker"); - opTracker = new TestPrimaryIndexOperationTracker(datasetId, - appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(), dsr.getIdGenerator()); - setFinal(opTrackerField, dsr, opTracker); + Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers"); + opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition, + appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(), + dslcManager.getComponentIdGenerator(datasetId, partition)); + replaceMapEntry(opTrackersField, dsr, partition, opTracker); } return opTracker; } catch (Exception e) { @@ -65,4 +70,14 @@ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); field.set(obj, newValue); } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + static void replaceMapEntry(Field field, Object obj, Object key, Object value) + throws Exception, IllegalAccessException { + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + Map map = (Map) field.get(obj); + map.put(key, value); + } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java index a10c234..70e5f6e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java @@ -42,6 +42,7 @@ import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; import org.junit.After; import org.junit.Assert; @@ -203,8 +204,10 @@ ((INcApplicationContext) integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager(); int maxMetadatasetId = 14; for (int i = 1; i <= maxMetadatasetId; i++) { - if (datasetLifecycleManager.getIndex(i, i) != null) { - final PrimaryIndexOperationTracker opTracker = datasetLifecycleManager.getOperationTracker(i); + ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(i, i); + if (index != null) { + final PrimaryIndexOperationTracker opTracker = + (PrimaryIndexOperationTracker) index.getOperationTracker(); Assert.assertEquals(0, opTracker.getNumActiveOperations()); } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java index a1978eb..6a70a29 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java @@ -170,7 +170,7 @@ final TransactionOptions options = new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL); final ITransactionManager transactionManager = ncAppCtx.getTransactionSubsystem().getTransactionManager(); final ITransactionContext txnCtx = transactionManager.beginTransaction(txnId, options); - txnCtx.register(resourceId, index, NoOpOperationCallback.INSTANCE, true); + txnCtx.register(resourceId, 0, index, NoOpOperationCallback.INSTANCE, true); return txnCtx; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index 41c5ade..4441c6e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -75,17 +75,19 @@ * creates (if necessary) and returns the primary index operation tracker of a dataset. * * @param datasetId + * @param partition * @return */ - PrimaryIndexOperationTracker getOperationTracker(int datasetId); + PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition); /** * creates (if necessary) and returns the component Id generator of a dataset. * * @param datasetId + * @param partition * @return */ - ILSMComponentIdGenerator getComponentIdGenerator(int datasetId); + ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition); /** * creates (if necessary) and returns the dataset virtual buffer caches. diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java index 8a83c7b..fffc170 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java @@ -66,7 +66,7 @@ IResourceIdFactory getResourceIdFactory(); - ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID); + ILSMOperationTracker getPrimaryOperationTracker(int datasetID, int partition); void initialize(boolean initialRun) throws IOException, ACIDException, AlgebricksException; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index e5fc998..41461ec 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.commons.lang3.tuple.Pair; @@ -92,10 +91,10 @@ ILSMComponent leftComponent = immutableComponents.get(mergeableIndexes.getLeft()); ILSMComponent rightComponent = immutableComponents.get(mergeableIndexes.getRight()); ILSMComponentId targetId = LSMComponentIdUtils.union(leftComponent.getId(), rightComponent.getId()); - Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); - int partition = getIndexPartition(index, indexInfos); - triggerScheduledMerge(targetId, - indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet())); + int partition = ((PrimaryIndexOperationTracker) index.getOperationTracker()).getPartition(); + Set<ILSMIndex> indexes = + datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetPartitionOpenIndexes(partition); + triggerScheduledMerge(targetId, indexes); return true; } @@ -107,11 +106,8 @@ * @param indexInfos * @throws HyracksDataException */ - private void triggerScheduledMerge(ILSMComponentId targetId, Set<IndexInfo> indexInfos) - throws HyracksDataException { - for (IndexInfo info : indexInfos) { - ILSMIndex lsmIndex = info.getIndex(); - + private void triggerScheduledMerge(ILSMComponentId targetId, Set<ILSMIndex> indexes) throws HyracksDataException { + for (ILSMIndex lsmIndex : indexes) { List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents()); if (isMergeOngoing(immutableComponents)) { continue; @@ -131,14 +127,5 @@ ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE); accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); } - } - - private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) { - for (IndexInfo info : indexInfos) { - if (info.getIndex() == index) { - return info.getPartition(); - } - } - return -1; } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index 9d63818..44baf77 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -30,6 +30,9 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { private static final Logger LOGGER = LogManager.getLogger(); + // partition -> index + private final Map<Integer, Set<IndexInfo>> partitionIndexes; + // resourceID -> index private final Map<Long, IndexInfo> indexes; private final int datasetID; private int numActiveIOOps; @@ -40,6 +43,7 @@ private boolean durable; public DatasetInfo(int datasetID) { + this.partitionIndexes = new HashMap<>(); this.indexes = new HashMap<>(); this.setLastAccess(-1); this.datasetID = datasetID; @@ -69,26 +73,17 @@ notifyAll(); } - public synchronized Set<ILSMIndex> getDatasetIndexes() { - Set<ILSMIndex> datasetIndexes = new HashSet<>(); - for (IndexInfo iInfo : getIndexes().values()) { - if (iInfo.isOpen()) { - datasetIndexes.add(iInfo.getIndex()); + public synchronized Set<ILSMIndex> getDatasetPartitionOpenIndexes(int partition) { + Set<ILSMIndex> indexSet = new HashSet<>(); + Set<IndexInfo> partitionIndexInfos = this.partitionIndexes.get(partition); + if (partitionIndexInfos != null) { + for (IndexInfo iInfo : partitionIndexInfos) { + if (iInfo.isOpen()) { + indexSet.add(iInfo.getIndex()); + } } } - - return datasetIndexes; - } - - public synchronized Set<IndexInfo> getDatsetIndexInfos() { - Set<IndexInfo> infos = new HashSet<>(); - for (IndexInfo iInfo : getIndexes().values()) { - if (iInfo.isOpen()) { - infos.add(iInfo); - } - } - - return infos; + return indexSet; } @Override @@ -160,6 +155,18 @@ return indexes; } + public synchronized void addIndex(long resourceID, IndexInfo indexInfo) { + indexes.put(resourceID, indexInfo); + partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition -> new HashSet<>()).add(indexInfo); + } + + public synchronized void removeIndex(long resourceID) { + IndexInfo info = indexes.remove(resourceID); + if (info != null) { + partitionIndexes.get(info.getPartition()).remove(info); + } + } + public boolean isRegistered() { return isRegistered; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java index 7b8397c..83e3144 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java @@ -21,9 +21,12 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; +import org.apache.hyracks.storage.common.IResource; /** * This factory implementation is used by AsterixDB layer so that indexes of a dataset (/partition) @@ -41,10 +44,12 @@ } @Override - public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) { + public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource) + throws HyracksDataException { IDatasetLifecycleManager dslcManager = ((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager(); - return dslcManager.getComponentIdGenerator(datasetId); + int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); + return dslcManager.getComponentIdGenerator(datasetId, partition); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 3a70515..1a61b8f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -139,7 +140,7 @@ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST); } - PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); + PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition()); if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) { if (LOGGER.isErrorEnabled()) { final String logMsg = String.format( @@ -155,7 +156,7 @@ DatasetInfo dsInfo = dsr.getDatasetInfo(); dsInfo.waitForIO(); closeIndex(iInfo); - dsInfo.getIndexes().remove(resourceID); + dsInfo.removeIndex(resourceID); if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty() && !dsInfo.isExternal()) { removeDatasetFromCache(dsInfo.getDatasetID()); @@ -203,10 +204,7 @@ List<DatasetResource> datasetsResources = new ArrayList<>(datasets.values()); Collections.sort(datasetsResources); for (DatasetResource dsr : datasetsResources) { - PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); - if (opTracker != null && opTracker.getNumActiveOperations() == 0 - && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen() - && !dsr.isMetadataDataset()) { + if (isCandidateDatasetForEviction(dsr)) { closeDataset(dsr); LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID()); return true; @@ -215,14 +213,18 @@ return false; } - private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException { - if (iInfo.isOpen()) { - ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); + private boolean isCandidateDatasetForEviction(DatasetResource dsr) { + for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) { + if (opTracker.getNumActiveOperations() != 0) { + return false; + } + } + if (dsr.getDatasetInfo().getReferenceCount() != 0 || !dsr.getDatasetInfo().isOpen() + || dsr.isMetadataDataset()) { + return false; } - // Wait for the above flush op. - dsInfo.waitForIO(); + return true; } public DatasetResource getDatasetLifecycle(int did) { @@ -234,12 +236,9 @@ dsr = datasets.get(did); if (dsr == null) { DatasetInfo dsInfo = new DatasetInfo(did); - ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(); - PrimaryIndexOperationTracker opTracker = - new PrimaryIndexOperationTracker(did, logManager, dsInfo, idGenerator); DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties, memoryManager.getNumPages(did), numPartitions); - dsr = new DatasetResource(dsInfo, opTracker, vbcs, idGenerator); + dsr = new DatasetResource(dsInfo, vbcs); datasets.put(did, dsr); } return dsr; @@ -318,13 +317,33 @@ } @Override - public PrimaryIndexOperationTracker getOperationTracker(int datasetId) { - return datasets.get(datasetId).getOpTracker(); + public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition) { + DatasetResource dataset = datasets.get(datasetId); + PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition); + if (opTracker == null) { + populateOpTrackerAndIdGenerator(dataset, partition); + opTracker = dataset.getOpTracker(partition); + } + return opTracker; } @Override - public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId) { - return datasets.get(datasetId).getIdGenerator(); + public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) { + DatasetResource dataset = datasets.get(datasetId); + ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition); + if (generator == null) { + populateOpTrackerAndIdGenerator(dataset, partition); + generator = dataset.getComponentIdGenerator(partition); + } + return generator; + } + + private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition) { + ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(); + PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition, + logManager, dataset.getDatasetInfo(), idGenerator); + dataset.setPrimaryIndexOperationTracker(partition, opTracker); + dataset.setIdGenerator(partition, idGenerator); } private void validateDatasetLifecycleManagerState() throws HyracksDataException { @@ -357,31 +376,40 @@ public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException { //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN for (DatasetResource dsr : datasets.values()) { - PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); - synchronized (opTracker) { - for (IndexInfo iInfo : dsr.getIndexes().values()) { - AbstractLSMIOOperationCallback ioCallback = - (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); - if (!(iInfo.getIndex().isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush() - || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) { - long firstLSN = ioCallback.getFirstLSN(); - if (firstLSN < targetLSN) { - LOGGER.info("Checkpoint flush dataset {}", dsr.getDatasetID()); - opTracker.setFlushOnExit(true); - if (opTracker.getNumActiveOperations() == 0) { - // No Modify operations currently, we need to trigger the flush and we can do so safely - opTracker.flushIfRequested(); - } - break; - } + for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) { + // check all partitions + synchronized (opTracker) { + scheduleAsyncFlushForLaggingDatasetPartition(dsr, opTracker, targetLSN); + } + } + } + } + + private void scheduleAsyncFlushForLaggingDatasetPartition(DatasetResource dsr, + PrimaryIndexOperationTracker opTracker, long targetLSN) throws HyracksDataException { + int partition = opTracker.getPartition(); + for (ILSMIndex lsmIndex : dsr.getDatasetInfo().getDatasetPartitionOpenIndexes(partition)) { + AbstractLSMIOOperationCallback ioCallback = + (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); + if (!(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush() + || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) { + long firstLSN = ioCallback.getFirstLSN(); + if (firstLSN < targetLSN) { + LOGGER.info("Checkpoint flush dataset {} partition {}", dsr.getDatasetID(), partition); + opTracker.setFlushOnExit(true); + if (opTracker.getNumActiveOperations() == 0) { + // No Modify operations currently, we need to trigger the flush and we can do so safely + opTracker.flushIfRequested(); } + break; } } } } /* - * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled + * This method can only be called asynchronously safely if we're sure no modify operation + * will take place until the flush is scheduled */ private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException { DatasetInfo dsInfo = dsr.getDatasetInfo(); @@ -389,53 +417,61 @@ // no memory components for external dataset return; } - PrimaryIndexOperationTracker primaryOpTracker = dsr.getOpTracker(); - if (primaryOpTracker.getNumActiveOperations() > 0) { - throw new IllegalStateException( - "flushDatasetOpenIndexes is called on a dataset with currently active operations"); - } + for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) { + // flush each partition one by one + if (primaryOpTracker.getNumActiveOperations() > 0) { + throw new IllegalStateException( + "flushDatasetOpenIndexes is called on a dataset with currently active operations"); + } + int partition = primaryOpTracker.getPartition(); + Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition); + ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition); + idGenerator.refresh(); - ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID()); - idGenerator.refresh(); + if (dsInfo.isDurable()) { + synchronized (logRecord) { + TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null); + try { + logManager.log(logRecord); + } catch (ACIDException e) { + throw new HyracksDataException("could not write flush log while closing dataset", e); + } - if (dsInfo.isDurable()) { - synchronized (logRecord) { - TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null); - try { - logManager.log(logRecord); - } catch (ACIDException e) { - throw new HyracksDataException("could not write flush log while closing dataset", e); + try { + //notification will come from LogBuffer class (notifyFlushTerminator) + logRecord.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } } + } + for (ILSMIndex index : indexes) { + //update resource lsn + AbstractLSMIOOperationCallback ioOpCallback = + (AbstractLSMIOOperationCallback) index.getIOOperationCallback(); + ioOpCallback.updateLastLSN(logRecord.getLSN()); + } - try { - //notification will come from LogPage class (notifyFlushTerminator) - logRecord.wait(); - } catch (InterruptedException e) { - throw new HyracksDataException(e); + if (asyncFlush) { + for (ILSMIndex index : indexes) { + ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); + accessor.scheduleFlush(index.getIOOperationCallback()); + } + } else { + for (ILSMIndex index : indexes) { + // TODO: This is not efficient since we flush the indexes sequentially. + // Think of a way to allow submitting the flush requests concurrently. + // We don't do them concurrently because this may lead to a deadlock scenario + // between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. + ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); + accessor.scheduleFlush(index.getIOOperationCallback()); + // Wait for the above flush op. + dsInfo.waitForIO(); } } } - for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - //update resource lsn - AbstractLSMIOOperationCallback ioOpCallback = - (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); - ioOpCallback.updateLastLSN(logRecord.getLSN()); - } - - if (asyncFlush) { - for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); - } - } else { - for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - // TODO: This is not efficient since we flush the indexes sequentially. - // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this - // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. - flushAndWaitForIO(dsInfo, iInfo); - } - } } private void closeDataset(DatasetResource dsr) throws HyracksDataException { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index c02de7e..8dcae23 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.common.context; +import java.util.Collection; +import java.util.HashMap; import java.util.Map; import org.apache.asterix.common.dataflow.DatasetLocalResource; @@ -41,17 +43,16 @@ */ public class DatasetResource implements Comparable<DatasetResource> { private final DatasetInfo datasetInfo; - private final PrimaryIndexOperationTracker datasetPrimaryOpTracker; private final DatasetVirtualBufferCaches datasetVirtualBufferCaches; - private final ILSMComponentIdGenerator datasetComponentIdGenerator; - public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker, - DatasetVirtualBufferCaches datasetVirtualBufferCaches, - ILSMComponentIdGenerator datasetComponentIdGenerator) { + private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers; + private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators; + + public DatasetResource(DatasetInfo datasetInfo, DatasetVirtualBufferCaches datasetVirtualBufferCaches) { this.datasetInfo = datasetInfo; - this.datasetPrimaryOpTracker = datasetPrimaryOpTracker; this.datasetVirtualBufferCaches = datasetVirtualBufferCaches; - this.datasetComponentIdGenerator = datasetComponentIdGenerator; + this.datasetPrimaryOpTrackers = new HashMap<>(); + this.datasetComponentIdGenerators = new HashMap<>(); } public boolean isRegistered() { @@ -108,7 +109,8 @@ if (index == null) { throw new HyracksDataException("Attempt to register a null index"); } - datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource, + + datasetInfo.addIndex(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource, ((DatasetLocalResource) resource.getResource()).getPartition())); } @@ -116,12 +118,31 @@ return datasetInfo; } - public PrimaryIndexOperationTracker getOpTracker() { - return datasetPrimaryOpTracker; + public PrimaryIndexOperationTracker getOpTracker(int partition) { + return datasetPrimaryOpTrackers.get(partition); } - public ILSMComponentIdGenerator getIdGenerator() { - return datasetComponentIdGenerator; + public Collection<PrimaryIndexOperationTracker> getOpTrackers() { + return datasetPrimaryOpTrackers.values(); + } + + public ILSMComponentIdGenerator getComponentIdGenerator(int partition) { + return datasetComponentIdGenerators.get(partition); + } + + public void setPrimaryIndexOperationTracker(int partition, PrimaryIndexOperationTracker opTracker) { + if (datasetPrimaryOpTrackers.containsKey(partition)) { + throw new IllegalStateException( + "PrimaryIndexOperationTracker has already been set for partition " + partition); + } + datasetPrimaryOpTrackers.put(partition, opTracker); + } + + public void setIdGenerator(int partition, ILSMComponentIdGenerator idGenerator) { + if (datasetComponentIdGenerators.containsKey(partition)) { + throw new IllegalStateException("LSMComponentIdGenerator has already been set for partition " + partition); + } + datasetComponentIdGenerators.put(partition, idGenerator); } @Override diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 5170310..1a76b66 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -43,6 +43,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker { + private final int partition; // Number of active operations on an ILSMIndex instance. private final AtomicInteger numActiveOperations; private final ILogManager logManager; @@ -50,9 +51,10 @@ private boolean flushOnExit = false; private boolean flushLogCreated = false; - public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo, + public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo, ILSMComponentIdGenerator idGenerator) { super(datasetID, dsInfo); + this.partition = partition; this.logManager = logManager; this.numActiveOperations = new AtomicInteger(); this.idGenerator = idGenerator; @@ -100,7 +102,7 @@ // or if there is a flush scheduled by the checkpoint (flushOnExit), then schedule it boolean needsFlush = false; - Set<ILSMIndex> indexes = dsInfo.getDatasetIndexes(); + Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition); if (!flushOnExit) { for (ILSMIndex lsmIndex : indexes) { @@ -146,7 +148,7 @@ //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { idGenerator.refresh(); - for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) { + for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) { //get resource ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE); //update resource lsn @@ -199,4 +201,8 @@ return flushLogCreated; } + public int getPartition() { + return partition; + } + } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java index ed56ab1..5b9883c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java @@ -24,11 +24,13 @@ import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; +import org.apache.hyracks.storage.common.IResource; public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory { @@ -38,17 +40,20 @@ protected transient INCServiceContext ncCtx; + protected transient IResource resource; + public AbstractLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) { this.idGeneratorFactory = idGeneratorFactory; } @Override - public void initialize(INCServiceContext ncCtx) { + public void initialize(INCServiceContext ncCtx, IResource resource) { this.ncCtx = ncCtx; + this.resource = resource; } - protected ILSMComponentIdGenerator getComponentIdGenerator() { - return idGeneratorFactory.getComponentIdGenerator(ncCtx); + protected ILSMComponentIdGenerator getComponentIdGenerator() throws HyracksDataException { + return idGeneratorFactory.getComponentIdGenerator(ncCtx, resource); } protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() { @@ -60,7 +65,7 @@ private static final long serialVersionUID = 1L; @Override - public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) { + public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource) { // used for backward compatibility // if idGeneratorFactory is not set for legacy lsm indexes, we return a default // component id generator which always generates the missing component id. diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java index 95245cb..97badb2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -32,7 +33,7 @@ } @Override - public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { + public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider()); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java index 6c75ed6..9b32345 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -31,7 +32,7 @@ } @Override - public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { + public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider()); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java index fb73d19..766ef95 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -32,7 +33,7 @@ } @Override - public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { + public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider()); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java index 94be0bb..3a0afa8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.ioopcallbacks; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -32,7 +33,7 @@ } @Override - public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) { + public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider()); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java index c4a2d03..a3d5bc5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java @@ -38,11 +38,13 @@ * transaction. * * @param resourceId + * @param partition * @param index * @param callback * @param primaryIndex */ - void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex); + void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback, + boolean primaryIndex); /** * Gets the unique transaction id. @@ -135,8 +137,10 @@ * Called to notify the transaction that an entity commit * log belonging to this transaction has been flushed to * disk. + * + * @param partition */ - void notifyEntityCommitted(); + void notifyEntityCommitted(int partition); /** * Called after an operation is performed on index diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java index 3aa7b17..f9f742a 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -22,15 +22,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy; import org.apache.asterix.common.context.DatasetInfo; import org.apache.asterix.common.context.IndexInfo; +import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; @@ -59,6 +58,8 @@ private final int MAX_COMPONENT_COUNT = 3; private final int DATASET_ID = 1; + + private long nextResourceId = 0; @Test public void testBasic() { @@ -183,19 +184,15 @@ } } - private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) { + private ILSMMergePolicy mockMergePolicy(IndexInfo... indexInfos) { Map<String, String> properties = new HashMap<>(); properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT)); properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE)); - Set<IndexInfo> indexInfos = new HashSet<>(); - for (IndexInfo info : indexes) { - indexInfos.add(info); + DatasetInfo dsInfo = new DatasetInfo(DATASET_ID); + for (IndexInfo index : indexInfos) { + dsInfo.addIndex(index.getResourceId(), index); } - - DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class); - Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos); - IDatasetLifecycleManager manager = Mockito.mock(IDatasetLifecycleManager.class); Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo); @@ -238,8 +235,16 @@ Mockito.when(index.createAccessor(Mockito.any(IIndexAccessParameters.class))).thenReturn(accessor); Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary); + if (isPrimary) { + PrimaryIndexOperationTracker opTracker = Mockito.mock(PrimaryIndexOperationTracker.class); + Mockito.when(opTracker.getPartition()).thenReturn(partition); + Mockito.when(index.getOperationTracker()).thenReturn(opTracker); + } final LocalResource localResource = Mockito.mock(LocalResource.class); - return new IndexInfo(index, DATASET_ID, localResource, partition); + Mockito.when(localResource.getId()).thenReturn(nextResourceId++); + IndexInfo indexInfo = new IndexInfo(index, DATASET_ID, localResource, partition); + indexInfo.setOpen(true); + return indexInfo; } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index 6634e51..e8f2595 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -42,6 +42,7 @@ import org.apache.asterix.common.transactions.ImmutableDatasetId; import org.apache.asterix.common.transactions.TransactionOptions; import org.apache.asterix.common.transactions.TxnId; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.api.ExtensionMetadataDataset; @@ -474,7 +475,9 @@ IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE); ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap); txnCtx.setWriteTxn(true); - txnCtx.register(metadataIndex.getResourceId(), lsmIndex, modCallback, metadataIndex.isPrimaryIndex()); + txnCtx.register(metadataIndex.getResourceId(), + StoragePathUtil.getPartitionNumFromRelativePath(resourceName), lsmIndex, modCallback, + metadataIndex.isPrimaryIndex()); LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager()); switch (op) { case INSERT: diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index 9753bcf..9ebd21b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -63,7 +63,6 @@ import org.apache.asterix.metadata.utils.MetadataUtil; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.formats.NonTaggedDataFormat; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory; import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory; import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory; @@ -476,4 +475,5 @@ public static void setNewUniverse(boolean isNewUniverse) { MetadataBootstrap.isNewUniverse = isNewUniverse; } + } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index ea2d715..8cd7053 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -42,6 +42,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider; @@ -818,6 +819,10 @@ protected int[] getDatasetPartitions(MetadataProvider metadataProvider) throws AlgebricksException { FileSplit[] splitsForDataset = metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this, getDatasetName()); - return IntStream.range(0, splitsForDataset.length).toArray(); + int[] partitions = new int[splitsForDataset.length]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = StoragePathUtil.getPartitionNumFromRelativePath(splitsForDataset[i].getPath()); + } + return partitions; } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java index 7913d48..6faffc7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java @@ -18,13 +18,12 @@ */ package org.apache.asterix.runtime.job.listener; -import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel; - import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel; import org.apache.asterix.common.transactions.TransactionOptions; import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksJobletContext; diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java index d057c50..10c6b8f 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java @@ -73,7 +73,7 @@ IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp, operatorNodePushable); - txnCtx.register(resource.getId(), index, modCallback, true); + txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, true); return modCallback; } catch (ACIDException e) { throw HyracksDataException.create(e); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java index f40140a..eef1cb0 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java @@ -21,9 +21,12 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; +import org.apache.hyracks.storage.common.IResource; public class PrimaryIndexOperationTrackerFactory implements ILSMOperationTrackerFactory { @@ -36,10 +39,12 @@ } @Override - public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) { + public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) + throws HyracksDataException { IDatasetLifecycleManager dslcManager = ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager(); - return dslcManager.getOperationTracker(datasetId); + int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); + return dslcManager.getOperationTracker(datasetId, partition); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java index 26e1b22..a927da0 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java @@ -69,7 +69,7 @@ IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); - txnCtx.register(resource.getId(), index, modCallback, false); + txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, false); return modCallback; } catch (ACIDException e) { throw HyracksDataException.create(e); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java index febcac2..7586980 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java @@ -18,12 +18,13 @@ */ package org.apache.asterix.transaction.management.opcallbacks; -import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.context.BaseOperationTracker; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; +import org.apache.hyracks.storage.common.IResource; public class SecondaryIndexOperationTrackerFactory implements ILSMOperationTrackerFactory { @@ -36,7 +37,7 @@ } @Override - public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) { + public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) { IDatasetLifecycleManager dslcManager = ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager(); return new BaseOperationTracker(datasetID, dslcManager.getDatasetInfo(datasetID)); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java index 1449a1b..8f7d445 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java @@ -68,7 +68,7 @@ IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); - txnCtx.register(resource.getId(), index, modCallback, true); + txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, true); return modCallback; } catch (ACIDException e) { throw new HyracksDataException(e); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java index 8724128..9ce5843 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java @@ -37,9 +37,8 @@ @Override public IResource createResource(FileReference fileRef) { - IResource resource = resourceFactory.createResource(fileRef); - // Currently, we get the partition number from the relative path int partition = StoragePathUtil.getPartitionNumFromRelativePath(fileRef.getRelativePath()); + IResource resource = resourceFactory.createResource(fileRef); return new DatasetLocalResource(datasetId, partition, resource); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index bc487fe..a630caa 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -222,7 +222,7 @@ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId); txnSubsystem.getLockManager().unlock(reusableDatasetId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx); - txnCtx.notifyEntityCommitted(); + txnCtx.notifyEntityCommitted(logRecord.getResourcePartition()); if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) { txnSubsystem.incrementEntityCommitCount(); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java index 43fe266..b3d5e49 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java @@ -118,7 +118,7 @@ } @Override - public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, + public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex) { synchronized (txnOpTrackers) { if (!txnOpTrackers.containsKey(resourceId)) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java index 1d132a8..219cf07 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java @@ -44,9 +44,9 @@ } @Override - public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, + public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex) { - super.register(resourceId, index, callback, primaryIndex); + super.register(resourceId, partition, index, callback, primaryIndex); synchronized (txnOpTrackers) { if (primaryIndex && !opTrackers.containsKey(resourceId)) { opTrackers.put(resourceId, index.getOperationTracker()); @@ -67,7 +67,7 @@ } @Override - public void notifyEntityCommitted() { + public void notifyEntityCommitted(int partition) { throw new IllegalStateException("Unexpected entity commit in atomic transaction"); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java index e195451..9d2f54b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java @@ -18,11 +18,15 @@ */ package org.apache.asterix.transaction.management.service.transaction; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.TxnId; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; @@ -32,30 +36,36 @@ @ThreadSafe public class EntityLevelTransactionContext extends AbstractTransactionContext { - private PrimaryIndexOperationTracker primaryIndexOpTracker; - private IModificationOperationCallback primaryIndexCallback; - private final AtomicInteger pendingOps; + private final Map<Integer, Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>> primaryIndexTrackers; + private final Map<Long, AtomicInteger> resourcePendingOps; + private final Map<Integer, AtomicInteger> partitionPendingOps; public EntityLevelTransactionContext(TxnId txnId) { super(txnId); - pendingOps = new AtomicInteger(0); + this.primaryIndexTrackers = new HashMap<>(); + this.resourcePendingOps = new HashMap<>(); + this.partitionPendingOps = new HashMap<>(); } @Override - public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, + public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex) { - super.register(resourceId, index, callback, primaryIndex); + super.register(resourceId, partition, index, callback, primaryIndex); synchronized (txnOpTrackers) { - if (primaryIndex && primaryIndexOpTracker == null) { - primaryIndexCallback = callback; - primaryIndexOpTracker = (PrimaryIndexOperationTracker) index.getOperationTracker(); + AtomicInteger pendingOps = partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0)); + resourcePendingOps.put(resourceId, pendingOps); + if (primaryIndex) { + Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair = + new Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>( + (PrimaryIndexOperationTracker) index.getOperationTracker(), callback); + primaryIndexTrackers.put(partition, pair); } } } @Override public void beforeOperation(long resourceId) { - pendingOps.incrementAndGet(); + resourcePendingOps.get(resourceId).incrementAndGet(); } @Override @@ -64,9 +74,11 @@ } @Override - public void notifyEntityCommitted() { + public void notifyEntityCommitted(int partition) { try { - primaryIndexOpTracker.completeOperation(null, LSMOperationType.MODIFICATION, null, primaryIndexCallback); + Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair = + primaryIndexTrackers.get(partition); + pair.first.completeOperation(null, LSMOperationType.MODIFICATION, null, pair.second); } catch (HyracksDataException e) { throw new ACIDException(e); } @@ -74,13 +86,15 @@ @Override public void afterOperation(long resourceId) { - pendingOps.decrementAndGet(); + resourcePendingOps.get(resourceId).decrementAndGet(); } @Override protected void cleanupForAbort() { - if (primaryIndexOpTracker != null) { - primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(pendingOps.get()); + for (Entry<Integer, Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>> e : primaryIndexTrackers + .entrySet()) { + AtomicInteger pendingOps = partitionPendingOps.get(e.getKey()); + e.getValue().first.cleanupNumActiveOperationsForAbortedJob(pendingOps.get()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java index 673bd3b..445f363 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java @@ -57,11 +57,11 @@ public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException { IIOManager ioManager = serviceCtx.getIoManager(); FileReference file = ioManager.resolve(path); - ioOpCallbackFactory.initialize(serviceCtx); + ioOpCallbackFactory.initialize(serviceCtx, this); return LSMBTreeUtil.createExternalBTree(ioManager, file, storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), - opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx), + opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, durable, metadataPageManagerFactory, serviceCtx.getTracer()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java index 7e44c63..acdb09e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java @@ -60,11 +60,11 @@ public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException { IIOManager ioManager = serviceCtx.getIoManager(); FileReference file = ioManager.resolve(path); - ioOpCallbackFactory.initialize(serviceCtx); + ioOpCallbackFactory.initialize(serviceCtx, this); return LSMBTreeUtil.createExternalBTreeWithBuddy(ioManager, file, storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), - opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx), + opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, bloomFilterKeyFields, durable, metadataPageManagerFactory, serviceCtx.getTracer()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java index 1988736..f0b86d2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java @@ -70,13 +70,13 @@ IIOManager ioManager = serviceCtx.getIoManager(); FileReference file = ioManager.resolve(path); List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file); - ioOpCallbackFactory.initialize(serviceCtx); + ioOpCallbackFactory.initialize(serviceCtx, this); //TODO: enable updateAwareness for secondary LSMBTree indexes boolean updateAware = false; return LSMBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), - opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx), + opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer()); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index 280803d..92d74d9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -94,7 +94,8 @@ ILSMDiskComponentFactory bulkLoadComponentFactory, ILSMDiskComponentFactory transactionComponentFactory, double bloomFilterFalsePositiveRate, IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, - ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable, ITracer tracer) { + ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable, ITracer tracer) + throws HyracksDataException { super(ioManager, insertLeafFrameFactory, deleteLeafFrameFactory, bufferCache, fileManager, componentFactory, bulkLoadComponentFactory, bloomFilterFalsePositiveRate, cmpFactories, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, false, durable, tracer); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index 5f04c0a..6993013 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -89,8 +89,8 @@ ILSMDiskComponentFactory bulkLoadComponentFactory, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, IBinaryComparatorFactory[] btreeCmpFactories, - IBinaryComparatorFactory[] buddyBtreeCmpFactories, int[] buddyBTreeFields, boolean durable, - ITracer tracer) { + IBinaryComparatorFactory[] buddyBtreeCmpFactories, int[] buddyBTreeFields, boolean durable, ITracer tracer) + throws HyracksDataException { super(ioManager, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, componentFactory, bulkLoadComponentFactory, durable, tracer); this.btreeCmpFactories = btreeCmpFactories; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index 0593ad5..b4990d6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -125,7 +125,7 @@ double bloomFilterFalsePositiveRate, IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean needKeyDupCheck, boolean durable, - ITracer tracer) { + ITracer tracer) throws HyracksDataException { super(ioManager, bufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, componentFactory, bulkLoadComponentFactory, durable, tracer); this.insertLeafFrameFactory = insertLeafFrameFactory; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java index 4706faa..cc10a98 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java @@ -135,7 +135,7 @@ int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable, - IMetadataPageManagerFactory freePageManagerFactory, ITracer tracer) { + IMetadataPageManagerFactory freePageManagerFactory, ITracer tracer) throws HyracksDataException { LSMBTreeTupleWriterFactory insertTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits, cmpFactories.length, false, false); LSMBTreeTupleWriterFactory deleteTupleWriterFactory = @@ -187,8 +187,8 @@ IBufferCache diskBufferCache, ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, - int[] buddyBTreeFields, boolean durable, IMetadataPageManagerFactory freePageManagerFactory, - ITracer tracer) { + int[] buddyBTreeFields, boolean durable, IMetadataPageManagerFactory freePageManagerFactory, ITracer tracer) + throws HyracksDataException { ITypeTraits[] buddyBtreeTypeTraits = new ITypeTraits[buddyBTreeFields.length]; IBinaryComparatorFactory[] buddyBtreeCmpFactories = new IBinaryComparatorFactory[buddyBTreeFields.length]; for (int i = 0; i < buddyBtreeTypeTraits.length; i++) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java index c0f530b..4ec82c1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java @@ -21,9 +21,11 @@ import java.io.Serializable; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.IResource; @FunctionalInterface public interface ILSMComponentIdGeneratorFactory extends Serializable { - - ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx); + ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource) + throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java index a9dc50e..e8742b5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java @@ -21,14 +21,16 @@ import java.io.Serializable; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.IResource; public interface ILSMIOOperationCallbackFactory extends Serializable { /** - * Initialize the callback factory with the given ncCtx + * Initialize the callback factory with the given ncCtx and resource * * @param ncCtx */ - void initialize(INCServiceContext ncCtx); + void initialize(INCServiceContext ncCtx, IResource resource); - ILSMIOOperationCallback createIoOpCallback(ILSMIndex index); + ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java index 217f794..ef22620 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java @@ -21,8 +21,10 @@ import java.io.Serializable; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.IResource; @FunctionalInterface public interface ILSMOperationTrackerFactory extends Serializable { - ILSMOperationTracker getOperationTracker(INCServiceContext ctx); + ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index 6c1ef55..5a95af0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -112,7 +112,7 @@ ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory, ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager, int[] filterFields, boolean durable, - IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer) { + IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer) throws HyracksDataException { this.ioManager = ioManager; this.virtualBufferCaches = virtualBufferCaches; this.diskBufferCache = diskBufferCache; @@ -146,7 +146,7 @@ double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory, - boolean durable, ITracer tracer) { + boolean durable, ITracer tracer) throws HyracksDataException { this.ioManager = ioManager; this.diskBufferCache = diskBufferCache; this.fileManager = fileManager; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java index 728c90a..d288ec8 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java @@ -22,6 +22,7 @@ import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; +import org.apache.hyracks.storage.common.IResource; /** * A default implementation of {@link ILSMComponentIdGeneratorFactory}. @@ -32,7 +33,7 @@ private static final long serialVersionUID = 1L; @Override - public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) { + public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource) { return new LSMComponentIdGenerator(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java index 3a58c19..eec2dca 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java @@ -29,6 +29,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; +import org.apache.hyracks.storage.common.IResource; public enum NoOpIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory { INSTANCE; @@ -39,7 +40,7 @@ } @Override - public void initialize(INCServiceContext ncCtx) { + public void initialize(INCServiceContext ncCtx, IResource resource) { // No op } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java index 55a2164..8d8e763 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java @@ -25,6 +25,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.storage.common.IResource; import org.apache.hyracks.storage.common.ISearchOperationCallback; /** @@ -43,7 +44,7 @@ } @Override - public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) { + public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) { return tracker; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java index d01e7ba..6c17ee8 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java @@ -21,6 +21,7 @@ import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; +import org.apache.hyracks.storage.common.IResource; public class ThreadCountingOperationTrackerFactory implements ILSMOperationTrackerFactory { @@ -32,7 +33,7 @@ } @Override - public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) { + public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) { return new ThreadCountingTracker(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java index a45f006..4eb7728 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java @@ -87,19 +87,19 @@ IBufferCache bufferCache = storageManager.getBufferCache(serviceCtx); ILSMMergePolicy mergePolicy = mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx); ILSMIOOperationScheduler ioScheduler = ioSchedulerProvider.getIoScheduler(serviceCtx); - ioOpCallbackFactory.initialize(serviceCtx); + ioOpCallbackFactory.initialize(serviceCtx, this); if (isPartitioned) { return InvertedIndexUtils.createPartitionedLSMInvertedIndex(ioManager, virtualBufferCaches, typeTraits, cmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, bufferCache, file.getAbsolutePath(), bloomFilterFalsePositiveRate, mergePolicy, - opTrackerProvider.getOperationTracker(serviceCtx), ioScheduler, ioOpCallbackFactory, + opTrackerProvider.getOperationTracker(serviceCtx, this), ioScheduler, ioOpCallbackFactory, invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, durable, metadataPageManagerFactory); } else { return InvertedIndexUtils.createLSMInvertedIndex(ioManager, virtualBufferCaches, typeTraits, cmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, bufferCache, file.getAbsolutePath(), - bloomFilterFalsePositiveRate, mergePolicy, opTrackerProvider.getOperationTracker(serviceCtx), + bloomFilterFalsePositiveRate, mergePolicy, opTrackerProvider.getOperationTracker(serviceCtx, this), ioScheduler, ioOpCallbackFactory, invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, durable, metadataPageManagerFactory); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java index 9960590..f72e17c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java @@ -66,13 +66,13 @@ public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException { IIOManager ioManager = ncServiceCtx.getIoManager(); FileReference fileRef = ioManager.resolve(path); - ioOpCallbackFactory.initialize(ncServiceCtx); + ioOpCallbackFactory.initialize(ncServiceCtx, this); return LSMRTreeUtils.createExternalRTree(ioManager, fileRef, storageManager.getBufferCache(ncServiceCtx), typeTraits, cmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType, bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, ncServiceCtx), - opTrackerProvider.getOperationTracker(ncServiceCtx), ioSchedulerProvider.getIoScheduler(ncServiceCtx), - ioOpCallbackFactory, linearizeCmpFactory, buddyBTreeFields, durable, isPointMBR, - metadataPageManagerFactory, ncServiceCtx.getTracer()); + opTrackerProvider.getOperationTracker(ncServiceCtx, this), + ioSchedulerProvider.getIoScheduler(ncServiceCtx), ioOpCallbackFactory, linearizeCmpFactory, + buddyBTreeFields, durable, isPointMBR, metadataPageManagerFactory, ncServiceCtx.getTracer()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java index f6396cf..634504b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java @@ -83,13 +83,14 @@ IIOManager ioManager = ncServiceCtx.getIoManager(); FileReference fileRef = ioManager.resolve(path); List<IVirtualBufferCache> virtualBufferCaches = vbcProvider.getVirtualBufferCaches(ncServiceCtx, fileRef); - ioOpCallbackFactory.initialize(ncServiceCtx); + ioOpCallbackFactory.initialize(ncServiceCtx, this); return LSMRTreeUtils.createLSMTree(ioManager, virtualBufferCaches, fileRef, storageManager.getBufferCache(ncServiceCtx), typeTraits, cmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType, bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, ncServiceCtx), - opTrackerProvider.getOperationTracker(ncServiceCtx), ioSchedulerProvider.getIoScheduler(ncServiceCtx), - ioOpCallbackFactory, linearizeCmpFactory, rtreeFields, buddyBTreeFields, filterTypeTraits, - filterCmpFactories, filterFields, durable, isPointMBR, metadataPageManagerFactory); + opTrackerProvider.getOperationTracker(ncServiceCtx, this), + ioSchedulerProvider.getIoScheduler(ncServiceCtx), ioOpCallbackFactory, linearizeCmpFactory, rtreeFields, + buddyBTreeFields, filterTypeTraits, filterCmpFactories, filterFields, durable, isPointMBR, + metadataPageManagerFactory); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java index f586d51..f91a5f7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java @@ -78,12 +78,12 @@ IIOManager ioManager = serviceCtx.getIoManager(); FileReference file = ioManager.resolve(path); List<IVirtualBufferCache> virtualBufferCaches = vbcProvider.getVirtualBufferCaches(serviceCtx, file); - ioOpCallbackFactory.initialize(serviceCtx); + ioOpCallbackFactory.initialize(serviceCtx, this); return LSMRTreeUtils.createLSMTreeWithAntiMatterTuples(ioManager, virtualBufferCaches, file, storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, btreeComparatorFactories, valueProviderFactories, rtreePolicyType, mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), - opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx), + opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, linearizeCmpFactory, rtreeFields, filterTypeTraits, filterCmpFactories, filterFields, durable, isPointMBR, metadataPageManagerFactory); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java index aac8905..ee37043 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java @@ -127,7 +127,7 @@ ILinearizeComparatorFactory linearizer, int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable, - boolean isPointMBR, ITracer tracer) { + boolean isPointMBR, ITracer tracer) throws HyracksDataException { super(ioManager, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, componentFactory, componentFactory, durable, tracer); this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java index e23a83e..f29bffc 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java @@ -87,7 +87,7 @@ int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, int[] buddyBTreeFields, boolean durable, - boolean isPointMBR, ITracer tracer) { + boolean isPointMBR, ITracer tracer) throws HyracksDataException { super(ioManager, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, bloomFilterFalsePositiveRate, rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields, linearizerArray, mergePolicy, diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index 123b38d..7172b74 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -100,7 +100,7 @@ int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, int[] buddyBTreeFields, boolean durable, - boolean isPointMBR, ITracer tracer) { + boolean isPointMBR, ITracer tracer) throws HyracksDataException { super(ioManager, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields, linearizerArray, bloomFilterFalsePositiveRate, diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java index 1af6779..9f17efa 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java @@ -49,7 +49,7 @@ harness.getFileReference(), harness.getDiskBufferCache(), SerdeUtils.serdesToTypeTraits(keySerdes), SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), - NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null), harness.getIOScheduler(), + NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true, null, null, null, null, true, harness.getMetadataPageManagerFactory(), false, ITracer.NONE); } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java index 0904806..77d52bb 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java @@ -59,7 +59,7 @@ harness.getFileReference(), harness.getDiskBufferCache(), SerdeUtils.serdesToTypeTraits(keySerdes), SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), - NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null), harness.getIOScheduler(), + NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true, null, null, null, null, true, harness.getMetadataPageManagerFactory(), false, ITracer.NONE); } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java index ccbaa9c..d42c3b6 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java @@ -71,7 +71,7 @@ harness.getFileReference(), harness.getDiskBufferCache(), SerdeUtils.serdesToTypeTraits(keySerdes), SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), - NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null), harness.getIOScheduler(), + NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true, null, null, null, null, true, harness.getMetadataPageManagerFactory(), true, ITracer.NONE); } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java index 9b53120..3ce24b0 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java @@ -67,11 +67,11 @@ vbcs.add(i, new TestVirtualBufferCache(vbc)); } } - ioOpCallbackFactory.initialize(serviceCtx); + ioOpCallbackFactory.initialize(serviceCtx, this); return TestLsmBtreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), - opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx), + opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, durable, metadataPageManagerFactory, false, serviceCtx.getTracer()); } -- To view, visit https://asterix-gerrit.ics.uci.edu/2263 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I9eb3854d2343e45beeccb87b0d434e5f4efd69c9 Gerrit-PatchSet: 23 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Luo Chen <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
