Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][STO] Ensure First Component ID is Initialized ......................................................................
[NO ISSUE][STO] Ensure First Component ID is Initialized - user model changes: no - storage format changes: no - interface changes: yes Details: - Initialize the component id generator from the primary index checkpoint, if exits, as soon as it is created. - Ensure the first component id is passed to all indexes. Change-Id: I246f9373f950e2f9a2c63f86746462e42a3f1c62 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2948 Reviewed-by: abdullah alamoudi <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java 14 files changed, 88 insertions(+), 95 deletions(-) Approvals: Anon. E. Moose #1000171: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index adf9960..946815f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -300,7 +300,6 @@ TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false); ILogRecord logRecord = null; - ILSMComponentIdGenerator idGenerator = null; try { logReader.setPosition(lowWaterMarkLSN); logRecord = logReader.next(); @@ -389,8 +388,7 @@ int partition = logRecord.getResourcePartition(); if (partitions.contains(partition)) { int datasetId = logRecord.getDatasetId(); - idGenerator = datasetLifecycleManager.getComponentIdGenerator(datasetId, partition); - if (idGenerator == null) { + if (!datasetLifecycleManager.isRegistered(datasetId)) { // it's possible this dataset has been dropped logRecord = logReader.next(); continue; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index 017c59f..5aa1b36 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -56,6 +56,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; +import org.apache.hyracks.test.support.TestUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -75,6 +76,7 @@ private static ITransactionContext txnCtx; private static LSMInsertDeleteOperatorNodePushable insertOp; private static final int PARTITION = 0; + private static String indexPath; @BeforeClass public static void setUp() throws Exception { @@ -109,6 +111,7 @@ txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx), new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); insertOp = StorageTestUtils.getInsertPipeline(nc, ctx); + indexPath = indexDataflowHelper.getResource().getPath(); } @After @@ -148,9 +151,9 @@ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh(); ILSMComponentId next = - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId(); long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN(); Map<String, Object> flushMap = new HashMap<>(); flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn); @@ -163,8 +166,8 @@ // rollback the last disk component lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); - next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh(); + next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId(); flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN(); flushMap = new HashMap<>(); flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn); @@ -216,9 +219,9 @@ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh(); ILSMComponentId next = - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId(); long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN(); Map<String, Object> flushMap = new HashMap<>(); flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn); @@ -248,8 +251,8 @@ // rollback the last disk component lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); - next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh(); + next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId(); flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN(); flushMap = new HashMap<>(); flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn); @@ -302,9 +305,9 @@ firstSearcher.waitUntilEntered(); // now that we enetered, we will rollback ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh(); ILSMComponentId next = - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId(); long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN(); Map<String, Object> flushMap = new HashMap<>(); flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn); @@ -329,8 +332,8 @@ secondSearcher.waitUntilEntered(); lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); - next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh(); + next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId(); flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN(); flushMap = new HashMap<>(); flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn); @@ -738,9 +741,10 @@ public void run() { ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); try { - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh(); - ILSMComponentId next = - dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath) + .refresh(); + ILSMComponentId next = dsLifecycleMgr + .getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId(); long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN(); Map<String, Object> flushMap = new HashMap<>(); flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java index f4c0ea6..c727f52 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java @@ -127,7 +127,7 @@ public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentIdGenerator idGenerator, IIndexCheckpointManagerProvider checkpointManagerProvider) { - super(dsInfo, index, idGenerator, checkpointManagerProvider); + super(dsInfo, index, idGenerator.getId(), checkpointManagerProvider); lsmBtree = (TestLsmBtree) index; } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java index 22ee542..c4390fa 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java @@ -55,12 +55,13 @@ DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager(); DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId); int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); - PrimaryIndexOperationTracker opTracker = dslcManager.getOperationTracker(datasetId, partition); + PrimaryIndexOperationTracker opTracker = + dslcManager.getOperationTracker(datasetId, partition, resource.getPath()); if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) { Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers"); opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition, appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(), - dslcManager.getComponentIdGenerator(datasetId, partition)); + dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath())); replaceMapEntry(opTrackersField, dsr, partition, opTracker); } return opTracker; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index 4441c6e..d18b6ab 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -40,6 +40,15 @@ IIndex getIndex(int datasetId, long indexId) throws HyracksDataException; /** + * Indicates if the dataset with id {@code datasetId} is currently registered + * with this {@link IDatasetLifecycleManager} + * + * @param datasetId + * @return true if the dataset is currently registered. Otherwise false. + */ + boolean isRegistered(int datasetId); + + /** * Flushes all open datasets synchronously. * * @throws HyracksDataException @@ -76,18 +85,20 @@ * * @param datasetId * @param partition + * @param path * @return */ - PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition); + PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path); /** * creates (if necessary) and returns the component Id generator of a dataset. * * @param datasetId * @param partition + * @param path * @return */ - ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition); + ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path); /** * creates (if necessary) and returns the dataset virtual buffer caches. diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java index 6476e68..811da78 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java @@ -53,7 +53,7 @@ IDatasetLifecycleManager dslcManager = ((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager(); int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); - return dslcManager.getComponentIdGenerator(datasetId, partition); + return dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath()); } @Override diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 682eaea..1dff69d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -38,7 +38,9 @@ import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.IIndexCheckpointManager; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.LogType; @@ -113,9 +115,6 @@ datasetResource = getDatasetLifecycle(did); } datasetResource.register(resource, (ILSMIndex) index); - if (((ILSMIndex) index).isPrimaryIndex()) { - initializeDatasetPartitionValidComponentId(datasetResource, resource); - } } private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException { @@ -326,32 +325,36 @@ } @Override - public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition) { + public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) { DatasetResource dataset = datasets.get(datasetId); PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition); if (opTracker == null) { - populateOpTrackerAndIdGenerator(dataset, partition); + populateOpTrackerAndIdGenerator(dataset, partition, path); opTracker = dataset.getOpTracker(partition); } return opTracker; } @Override - public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) { + public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) { DatasetResource dataset = datasets.get(datasetId); - if (dataset == null) { - return null; - } ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition); if (generator == null) { - populateOpTrackerAndIdGenerator(dataset, partition); + populateOpTrackerAndIdGenerator(dataset, partition, path); generator = dataset.getComponentIdGenerator(partition); } return generator; } - private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition) { - ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum()); + @Override + public synchronized boolean isRegistered(int datasetId) { + return datasets.containsKey(datasetId); + } + + private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition, String path) { + final long lastValidId = getDatasetLastValidComponentId(path); + ILSMComponentIdGenerator idGenerator = + new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum(), lastValidId); PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition, logManager, dataset.getDatasetInfo(), idGenerator); dataset.setPrimaryIndexOperationTracker(partition, opTracker); @@ -605,21 +608,15 @@ } } - private void initializeDatasetPartitionValidComponentId(DatasetResource datasetResource, - LocalResource primaryIndexResource) { - final IndexInfo indexInfo = datasetResource.getIndexInfo(primaryIndexResource.getId()); - final int partition = indexInfo.getPartition(); - final ILSMComponentIdGenerator componentIdGenerator = - getComponentIdGenerator(datasetResource.getDatasetID(), partition); - final long indexLastValidComponentId = getIndexLastValidComponentId(indexInfo.getLocalResource()); - componentIdGenerator.init(indexLastValidComponentId); - } - - private long getIndexLastValidComponentId(LocalResource resource) { + private long getDatasetLastValidComponentId(String indexPath) { try { - final DatasetResourceReference datasetResource = DatasetResourceReference.of(resource); - return Math.max(indexCheckpointManagerProvider.get(datasetResource).getLatest().getLastComponentId(), - MIN_VALID_COMPONENT_ID); + final ResourceReference indexRef = ResourceReference.ofIndex(indexPath); + final ResourceReference primaryIndexRef = indexRef.getDatasetReference(); + final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(primaryIndexRef); + if (indexCheckpointManager.getCheckpointCount() > 0) { + return Math.max(indexCheckpointManager.getLatest().getLastComponentId(), MIN_VALID_COMPONENT_ID); + } + return MIN_VALID_COMPONENT_ID; } catch (HyracksDataException e) { throw new IllegalStateException(e); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java index 606d63c..53bc31b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java @@ -37,7 +37,6 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus; @@ -69,19 +68,18 @@ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; protected final DatasetInfo dsInfo; protected final ILSMIndex lsmIndex; - private final ILSMComponentIdGenerator componentIdGenerator; private long firstLsnForCurrentMemoryComponent = 0L; private long persistenceLsn = 0L; private int pendingFlushes = 0; private Deque<ILSMComponentId> componentIds = new ArrayDeque<>(); private boolean firstAllocation = true; - public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentIdGenerator componentIdGenerator, + public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId componentId, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) { this.dsInfo = dsInfo; this.lsmIndex = lsmIndex; - this.componentIdGenerator = componentIdGenerator; this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; + componentIds.add(componentId); } @Override @@ -278,9 +276,6 @@ @Override public void allocated(ILSMMemoryComponent component) throws HyracksDataException { - if (firstAllocation) { - firstAllocation = false; - componentIds.add(componentIdGenerator.getId()); - } + // no op } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java index 68ffd6a..25cd8b2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java @@ -70,8 +70,8 @@ @Override public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { - return new LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, getComponentIdGenerator(), - getIndexCheckpointManagerProvider()); + return new LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, + getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider()); } @Override diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java index 8bd848d..a24bf72 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java @@ -38,6 +38,10 @@ protected ResourceReference() { } + public static ResourceReference ofIndex(String indexPath) { + return of(new File(indexPath, StorageConstants.METADATA_FILE_NAME).toString()); + } + public static ResourceReference of(String localResourcePath) { ResourceReference lrr = new ResourceReference(); parse(lrr, localResourcePath); @@ -72,6 +76,11 @@ return Paths.get(root, partition, dataverse, dataset, rebalance, index); } + public ResourceReference getDatasetReference() { + return ResourceReference + .ofIndex(Paths.get(root, partition, dataverse, dataset, rebalance, dataset).toFile().getPath()); + } + public Path getFileRelativePath() { return Paths.get(root, partition, dataverse, dataset, rebalance, index, name); } diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java index 4b18e1d..db0911b 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java @@ -78,10 +78,9 @@ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); DatasetInfo dsInfo = new DatasetInfo(101, null); - LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents); - idGenerator.init(MIN_VALID_COMPONENT_ID); - LSMIOOperationCallback callback = - new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); + LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID); + LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), + mockIndexCheckpointManagerProvider()); //Flush first idGenerator.refresh(); long flushLsn = 1L; @@ -138,19 +137,19 @@ public void testAllocateComponentId() throws HyracksDataException { int numMemoryComponents = 2; DatasetInfo dsInfo = new DatasetInfo(101, null); - ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents); - idGenerator.init(MIN_VALID_COMPONENT_ID); + ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID); ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); - LSMIOOperationCallback callback = - new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); - callback.allocated(mockComponent); + LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), + mockIndexCheckpointManagerProvider()); ILSMComponentId initialId = idGenerator.getId(); + // simulate a partition is flushed before allocated idGenerator.refresh(); long flushLsn = 1L; ILSMComponentId nextComponentId = idGenerator.getId(); + callback.allocated(mockComponent); callback.recycled(mockComponent); checkMemoryComponent(initialId, mockComponent); } @@ -159,16 +158,14 @@ public void testRecycleComponentId() throws HyracksDataException { int numMemoryComponents = 2; DatasetInfo dsInfo = new DatasetInfo(101, null); - ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents); - idGenerator.init(MIN_VALID_COMPONENT_ID); + ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID); ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); - LSMIOOperationCallback callback = - new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); + LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), + mockIndexCheckpointManagerProvider()); String indexId = "mockIndexId"; - callback.allocated(mockComponent); ILSMComponentId id = idGenerator.getId(); callback.recycled(mockComponent); checkMemoryComponent(id, mockComponent); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java index e353714..ab84211 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java @@ -49,7 +49,7 @@ IDatasetLifecycleManager dslcManager = ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager(); int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); - return dslcManager.getOperationTracker(datasetId, partition); + return dslcManager.getOperationTracker(datasetId, partition, resource.getPath()); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java index 0af06d7..a5c6360 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java @@ -38,11 +38,4 @@ * @return the index of the current memory component */ int getCurrentComponentIndex(); - - /** - * Initializes this {@link ILSMComponentIdGenerator} by setting the last used id - * - * @param lastUsedId - */ - void init(long lastUsedId); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java index e6bf0ab..21a27a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java @@ -33,25 +33,16 @@ private int currentComponentIndex; private long lastUsedId; private ILSMComponentId componentId; - private boolean initialized = false; - public LSMComponentIdGenerator(int numComponents) { + public LSMComponentIdGenerator(int numComponents, long lastUsedId) { this.numComponents = numComponents; - } - - @Override - public synchronized void init(long lastUsedId) { this.lastUsedId = lastUsedId; - initialized = true; refresh(); currentComponentIndex = 0; } @Override public synchronized void refresh() { - if (!initialized) { - throw new IllegalStateException("Attempt to refresh component id before initialziation."); - } final long nextId = ++lastUsedId; componentId = new LSMComponentId(nextId, nextId); currentComponentIndex = (currentComponentIndex + 1) % numComponents; @@ -59,9 +50,6 @@ @Override public synchronized ILSMComponentId getId() { - if (!initialized) { - throw new IllegalStateException("Attempt to get component id before initialziation."); - } return componentId; } -- To view, visit https://asterix-gerrit.ics.uci.edu/2948 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I246f9373f950e2f9a2c63f86746462e42a3f1c62 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: stabilization-f69489 Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
