Murtadha Hubail has submitted this change and it was merged. Change subject: ASTERIXDB-1058: make Asterix compatible with lazy LSM memory allocation ......................................................................
ASTERIXDB-1058: make Asterix compatible with lazy LSM memory allocation - Adapt memory budget calculation to lazy LSM memory allocation. - Add IDatasetLifecycleManager interface. Change-Id: I4ea1eb129fe3043d43b077473dc29d17a97dfcc2 Reviewed-on: https://asterix-gerrit.ics.uci.edu/408 Tested-by: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> Reviewed-by: Young-Seok Kim <[email protected]> --- M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java M asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java M asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java A asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java M asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java M asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java M asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java M asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java M asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java M asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java M asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java M asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java M asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java M asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java M asterix-transactions/pom.xml M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java 33 files changed, 354 insertions(+), 249 deletions(-) Approvals: Young-Seok Kim: Looks good to me, approved Ian Maxon: Looks good to me, approved Jenkins: Verified diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java index 2a15384..2e7c23f 100644 --- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java +++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java @@ -24,6 +24,7 @@ import org.apache.asterix.common.api.AsterixThreadExecutor; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.config.AsterixCompilerProperties; import org.apache.asterix.common.config.AsterixExternalProperties; import org.apache.asterix.common.config.AsterixFeedProperties; @@ -42,16 +43,13 @@ import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.feeds.FeedManager; import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory; -import org.apache.asterix.transaction.management.service.logging.LogManager; import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; import org.apache.hyracks.api.application.INCApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; @@ -99,7 +97,7 @@ private AsterixBuildProperties buildProperties; private AsterixThreadExecutor threadExecutor; - private DatasetLifecycleManager indexLifecycleManager; + private IDatasetLifecycleManager datasetLifecycleManager; private IFileMapManager fileMapManager; private IBufferCache bufferCache; private ITransactionSubsystem txnSubsystem; @@ -112,7 +110,7 @@ private IFeedManager feedManager; - public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) throws AsterixException { + public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) { this.ncApplicationContext = ncApplicationContext; compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR); externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR); @@ -143,8 +141,7 @@ ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory( ioManager, ncApplicationContext.getNodeId()); - localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory - .createRepository(); + localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository(); resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory(); IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery( @@ -152,8 +149,8 @@ txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider, txnProperties); - indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository, - MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, (LogManager) txnSubsystem.getLogManager()); + datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository, + MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager()); isShuttingdown = false; @@ -165,7 +162,7 @@ lccm.register((ILifeCycleComponent) bufferCache); lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager()); lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager()); - lccm.register((ILifeCycleComponent) indexLifecycleManager); + lccm.register((ILifeCycleComponent) datasetLifecycleManager); lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager()); lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager()); } @@ -193,8 +190,8 @@ return txnSubsystem; } - public IIndexLifecycleManager getIndexLifecycleManager() { - return indexLifecycleManager; + public IDatasetLifecycleManager getDatasetLifecycleManager() { + return datasetLifecycleManager; } public double getBloomFilterFalsePositiveRate() { @@ -258,12 +255,12 @@ @Override public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) { - return indexLifecycleManager.getVirtualBufferCaches(datasetID); + return datasetLifecycleManager.getVirtualBufferCaches(datasetID); } @Override public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) { - return indexLifecycleManager.getOperationTracker(datasetID); + return datasetLifecycleManager.getOperationTracker(datasetID); } @Override diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java index 1d504dd..570c3c9 100644 --- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java +++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java @@ -22,10 +22,10 @@ import org.apache.asterix.common.api.AsterixThreadExecutor; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; @@ -58,8 +58,8 @@ } @Override - public IIndexLifecycleManager getIndexLifecycleManager() { - return asterixAppRuntimeContext.getIndexLifecycleManager(); + public IDatasetLifecycleManager getDatasetLifecycleManager() { + return asterixAppRuntimeContext.getDatasetLifecycleManager(); } @Override diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java index 3c5549f..f86ed8a 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java @@ -28,7 +28,6 @@ import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; @@ -60,7 +59,7 @@ public ILocalResourceRepository getLocalResourceRepository(); - public IIndexLifecycleManager getIndexLifecycleManager(); + public IDatasetLifecycleManager getDatasetLifecycleManager(); public ResourceIdFactory getResourceIdFactory(); diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java index f978a8a..1e4869e 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java @@ -19,7 +19,6 @@ package org.apache.asterix.common.api; import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; @@ -35,7 +34,7 @@ public IFileMapProvider getFileMapManager(); - public IIndexLifecycleManager getIndexLifecycleManager(); + public IDatasetLifecycleManager getDatasetLifecycleManager(); public double getBloomFilterFalsePositiveRate(); diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java new file mode 100644 index 0000000..86e6db5 --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import java.util.List; + +import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.IIndex; +import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; +import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; + +public interface IDatasetLifecycleManager extends IIndexLifecycleManager { + /** + * @param datasetID + * @param resourceID + * @return The corresponding index, or null if it is not found in the registered indexes. + * @throws HyracksDataException + */ + IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException; + + /** + * Allocates the memory budget of the dataset in the virtual buffer cache. + * @param datasetID + * @throws HyracksDataException + */ + void allocateDatasetMemory(int datasetID) throws HyracksDataException; + + /** + * Flushes all open datasets synchronously. + * @throws HyracksDataException + */ + void flushAllDatasets() throws HyracksDataException; + + /** + * Schedules asynchronous flush on datasets that have memory components with first LSN < nonSharpCheckpointTargetLSN. + * @param nonSharpCheckpointTargetLSN + * @throws HyracksDataException + */ + void scheduleAsyncFlushForLaggingDatasets(long nonSharpCheckpointTargetLSN) throws HyracksDataException; + + /** + * creates (if necessary) and returns the dataset info. + * @param datasetID + * @return + */ + DatasetInfo getDatasetInfo(int datasetID); + + /** + * @param datasetId + * the dataset id to be flushed. + * @param asyncFlush + * a flag indicating whether to wait for the flush to complete or not. + * @throws HyracksDataException + */ + void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException; + + /** + * creates (if necessary) and returns the primary index operation tracker of a dataset. + * @param datasetID + * @return + */ + ILSMOperationTracker getOperationTracker(int datasetID); + + /** + * creates (if necessary) and returns the dataset virtual buffer caches. + * @param datasetID + * @return + */ + List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID); +} diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java index 3b824ec..d78e2cb 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.common.context; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback; @@ -28,11 +29,11 @@ public class BaseOperationTracker implements ILSMOperationTracker { - protected final DatasetLifecycleManager datasetLifecycleManager; + protected final IDatasetLifecycleManager datasetLifecycleManager; protected final int datasetID; - protected DatasetInfo dsInfo; - - public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID, DatasetInfo dsInfo) { + protected final DatasetInfo dsInfo; + + public BaseOperationTracker(IDatasetLifecycleManager datasetLifecycleManager, int datasetID, DatasetInfo dsInfo) { this.datasetLifecycleManager = datasetLifecycleManager; this.datasetID = datasetID; this.dsInfo = dsInfo; @@ -59,10 +60,6 @@ IModificationOperationCallback modificationCallback) throws HyracksDataException { } - public void setDatasetInfo(DatasetInfo dsInfo){ - this.dsInfo = dsInfo; - } - public void exclusiveJobCommitted() throws HyracksDataException { } } diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index 567f75e..a7374d3 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.Set; -import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.common.api.IndexException; @@ -42,11 +42,11 @@ private long maxMergableComponentSize; private int maxToleranceComponentCount; - private final DatasetLifecycleManager datasetLifecycleManager; + private final IDatasetLifecycleManager datasetLifecycleManager; private final int datasetID; - + public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int datasetID) { - this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager; + this.datasetLifecycleManager = (DatasetLifecycleManager)datasetLifecycleManager; this.datasetID = datasetID; } @@ -70,7 +70,7 @@ } } if (fullMergeIsRequested) { - ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE, + ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleFullMerge(index.getIOOperationCallback()); return; @@ -113,7 +113,7 @@ // Reverse the components order back to its original order Collections.reverse(mergableComponents); - ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor( + ILSMIndexAccessor accessor = lsmIndex.createAccessor( NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); } diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java index 385bbbe..ce405fc 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; @@ -42,8 +43,8 @@ @Override public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IHyracksTaskContext ctx) { - DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx - .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager(); + IDatasetLifecycleManager dslcManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext() + .getApplicationContext().getApplicationObject()).getDatasetLifecycleManager(); ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(dslcManager, datasetID); policy.configure(properties); return policy; diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 741e106..b2e7b69 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.ILocalResourceMetadata; import org.apache.asterix.common.config.AsterixStorageProperties; import org.apache.asterix.common.exceptions.ACIDException; @@ -37,7 +38,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.storage.am.common.api.IIndex; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; @@ -50,7 +50,7 @@ import org.apache.hyracks.storage.common.file.ILocalResourceRepository; import org.apache.hyracks.storage.common.file.LocalResource; -public class DatasetLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent { +public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent { private final AsterixStorageProperties storageProperties; private final Map<Integer, List<IVirtualBufferCache>> datasetVirtualBufferCaches; private final Map<Integer, ILSMOperationTracker> datasetOpTrackers; @@ -108,6 +108,7 @@ dsInfo.isExternal = !index.hasMemoryComponents(); dsInfo.isRegistered = true; } + if (dsInfo.indexes.containsKey(resourceID)) { throw new HyracksDataException("Index with resource ID " + resourceID + " already exists."); } @@ -193,21 +194,8 @@ throw new HyracksDataException("Failed to open index with resource ID " + resourceID + " since it does not exist."); } - - // This is not needed for external datasets' indexes since they never use the virtual buffer cache. if (!dsInfo.isOpen && !dsInfo.isExternal) { - List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did); - assert vbcs != null; - long additionalSize = 0; - for (IVirtualBufferCache vbc : vbcs) { - additionalSize += vbc.getNumPages() * vbc.getPageSize(); - } - while (used + additionalSize > capacity) { - if (!evictCandidateDataset()) { - throw new HyracksDataException("Cannot activate index since memory budget would be exceeded."); - } - } - used += additionalSize; + initializeDatasetVirtualBufferCache(did); } dsInfo.isOpen = true; @@ -226,7 +214,6 @@ // We will take a dataset that has no active transactions, it is open (a dataset consuming memory), // that is not being used (refcount == 0) and has been least recently used. The sort order defined // for DatasetInfo maintains this. See DatasetInfo.compareTo(). - List<DatasetInfo> datasetInfosList = new ArrayList<DatasetInfo>(datasetInfos.values()); Collections.sort(datasetInfosList); for (DatasetInfo dsInfo : datasetInfosList) { @@ -241,7 +228,7 @@ return false; } - private void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException { + private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException { if (iInfo.isOpen) { ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); @@ -261,6 +248,7 @@ } } + @Override public DatasetInfo getDatasetInfo(int datasetID) { synchronized (datasetInfos) { DatasetInfo dsInfo = datasetInfos.get(datasetID); @@ -306,32 +294,39 @@ synchronized (datasetVirtualBufferCaches) { List<IVirtualBufferCache> vbcs = datasetVirtualBufferCaches.get(datasetID); if (vbcs == null) { - vbcs = new ArrayList<IVirtualBufferCache>(); - int numPages = datasetID < firstAvilableUserDatasetID ? storageProperties - .getMetadataMemoryComponentNumPages() : storageProperties.getMemoryComponentNumPages(); - for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) { - MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache( - new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(), numPages - / storageProperties.getMemoryComponentsNum())); - vbcs.add(vbc); + initializeDatasetVirtualBufferCache(datasetID); + vbcs = datasetVirtualBufferCaches.get(datasetID); + if (vbcs == null) { + throw new IllegalStateException("Could not find dataset " + datasetID + " virtual buffer cache."); } - datasetVirtualBufferCaches.put(datasetID, vbcs); } return vbcs; } } - private void removeDatasetFromCache(int datasetID) { - List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(datasetID); - assert vbcs != null; - for (IVirtualBufferCache vbc : vbcs) { - used -= (vbc.getNumPages() * vbc.getPageSize()); - } + private void removeDatasetFromCache(int datasetID) throws HyracksDataException { + deallocateDatasetMemory(datasetID); datasetInfos.remove(datasetID); datasetVirtualBufferCaches.remove(datasetID); datasetOpTrackers.remove(datasetID); } + private void initializeDatasetVirtualBufferCache(int datasetID) { + List<IVirtualBufferCache> vbcs = new ArrayList<IVirtualBufferCache>(); + synchronized (datasetVirtualBufferCaches) { + int numPages = datasetID < firstAvilableUserDatasetID ? storageProperties + .getMetadataMemoryComponentNumPages() : storageProperties.getMemoryComponentNumPages(); + for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) { + MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache( + new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(), numPages + / storageProperties.getMemoryComponentsNum())); + vbcs.add(vbc); + } + datasetVirtualBufferCaches.put(datasetID, vbcs); + } + } + + @Override public ILSMOperationTracker getOperationTracker(int datasetID) { synchronized (datasetOpTrackers) { ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID); @@ -376,12 +371,14 @@ private int numActiveIOOps; private boolean isExternal; private boolean isRegistered; + private boolean memoryAllocated; public DatasetInfo(int datasetID) { this.indexes = new HashMap<Long, IndexInfo>(); this.lastAccess = -1; this.datasetID = datasetID; this.isRegistered = false; + this.memoryAllocated = false; } @Override @@ -406,7 +403,7 @@ notifyAll(); } - public synchronized Set<ILSMIndex> getDatasetIndexes() throws HyracksDataException { + public synchronized Set<ILSMIndex> getDatasetIndexes() { Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>(); for (IndexInfo iInfo : indexes.values()) { if (iInfo.isOpen) { @@ -454,7 +451,16 @@ @Override public String toString() { return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount - + ", lastAccess: " + lastAccess + "}"; + + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: " + + memoryAllocated; + } + + public boolean isMemoryAllocated() { + return memoryAllocated; + } + + public int getDatasetID() { + return datasetID; } } @@ -463,12 +469,14 @@ used = 0; } + @Override public synchronized void flushAllDatasets() throws HyracksDataException { for (DatasetInfo dsInfo : datasetInfos.values()) { flushDatasetOpenIndexes(dsInfo, false); } } + @Override public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException { DatasetInfo datasetInfo = datasetInfos.get(datasetId); if (datasetInfo != null) { @@ -476,6 +484,7 @@ } } + @Override public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException { //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN for (DatasetInfo dsInfo : datasetInfos.values()) { @@ -537,7 +546,6 @@ accessor.scheduleFlush(iInfo.index.getIOOperationCallback()); } } else { - for (IndexInfo iInfo : dsInfo.indexes.values()) { // TODO: This is not efficient since we flush the indexes sequentially. // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this @@ -557,14 +565,12 @@ throw new HyracksDataException(e); } } - } try { flushDatasetOpenIndexes(dsInfo, false); } catch (Exception e) { throw new HyracksDataException(e); } - for (IndexInfo iInfo : dsInfo.indexes.values()) { if (iInfo.isOpen) { ILSMOperationTracker opTracker = iInfo.index.getOperationTracker(); @@ -576,7 +582,6 @@ assert iInfo.referenceCount == 0; } dsInfo.isOpen = false; - removeDatasetFromCache(dsInfo.datasetID); } @@ -628,4 +633,48 @@ outputStream.write(sb.toString().getBytes()); } -} + + @Override + public synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException { + DatasetInfo dsInfo = datasetInfos.get(datasetId); + if (dsInfo == null) { + throw new HyracksDataException("Failed to allocate memory for dataset with ID " + datasetId + + " since it is not open."); + } + synchronized (dsInfo) { + // This is not needed for external datasets' indexes since they never use the virtual buffer cache. + if (!dsInfo.memoryAllocated && !dsInfo.isExternal) { + List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID); + long additionalSize = 0; + for (IVirtualBufferCache vbc : vbcs) { + additionalSize += vbc.getNumPages() * vbc.getPageSize(); + } + while (used + additionalSize > capacity) { + if (!evictCandidateDataset()) { + throw new HyracksDataException("Cannot allocate dataset " + dsInfo.datasetID + + " memory since memory budget would be exceeded."); + } + } + used += additionalSize; + dsInfo.memoryAllocated = true; + } + } + } + + private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException { + DatasetInfo dsInfo = datasetInfos.get(datasetId); + if (dsInfo == null) { + throw new HyracksDataException("Failed to deallocate memory for dataset with ID " + datasetId + + " since it is not open."); + } + synchronized (dsInfo) { + if (dsInfo.isOpen && dsInfo.memoryAllocated) { + List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID); + for (IVirtualBufferCache vbc : vbcs) { + used -= (vbc.getNumPages() * vbc.getPageSize()); + } + dsInfo.memoryAllocated = false; + } + } + } +} \ No newline at end of file diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 6c63248..c3d75b1 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; @@ -48,7 +49,7 @@ private boolean flushOnExit = false; private boolean flushLogCreated = false; - public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID, + public PrimaryIndexOperationTracker(IDatasetLifecycleManager datasetLifecycleManager, int datasetID, ILogManager logManager, DatasetInfo dsInfo) { super(datasetLifecycleManager, datasetID, dsInfo); this.logManager = logManager; @@ -59,6 +60,9 @@ public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback) throws HyracksDataException { if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) { + if (!dsInfo.isMemoryAllocated()) { + datasetLifecycleManager.allocateDatasetMemory(dsInfo.getDatasetID()); + } incrementNumActiveOperations(modificationCallback); } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) { dsInfo.declareActiveIOOperation(); @@ -77,7 +81,7 @@ @Override public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback) - throws HyracksDataException { + throws HyracksDataException { if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) { decrementNumActiveOperations(modificationCallback); if (numActiveOperations.get() == 0) { @@ -108,7 +112,7 @@ } if (needsFlush || flushOnExit) { - //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is schedule. + //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is scheduled. for (ILSMIndex lsmIndex : indexes) { AbstractLSMIndex abstractLSMIndex = ((AbstractLSMIndex) lsmIndex); ILSMOperationTracker opTracker = abstractLSMIndex.getOperationTracker(); @@ -133,7 +137,7 @@ } } - //Since this method is called sequentially by LogPage.notifyFlushTerminator in the sequence flush were scheduled. + //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) { diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java index c965dd2..d308564 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java @@ -22,8 +22,8 @@ import org.apache.asterix.common.api.AsterixThreadExecutor; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; @@ -42,7 +42,7 @@ public ITransactionSubsystem getTransactionSubsystem(); - public IIndexLifecycleManager getIndexLifecycleManager(); + public IDatasetLifecycleManager getDatasetLifecycleManager(); public double getBloomFilterFalsePositiveRate(); diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java index 69ea1f1..f030e18 100644 --- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java +++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java @@ -212,12 +212,19 @@ // In future this may be changed depending on the requested // output format sent to the servlet. String errorBody = method.getResponseBodyAsString(); - JSONObject result = new JSONObject(errorBody); - String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"), - result.getString("stacktrace") }; - GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]); - throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine() - + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]); + JSONObject result = null; + try { + result = new JSONObject(errorBody); + } catch (Exception e) { + throw new Exception(errorBody); + } + if (result != null) { + String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"), + result.getString("stacktrace") }; + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]); + throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine() + + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]); + } } return statusCode; } diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java index 698a54f..97260a1 100644 --- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java +++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java @@ -85,8 +85,8 @@ */ public class MetadataManager implements IMetadataManager { private static final int INITIAL_SLEEP_TIME = 64; - private static final int RETRY_MULTIPLIER = 4; - private static final int MAX_RETRY_COUNT = 6; + private static final int RETRY_MULTIPLIER = 5; + private static final int MAX_RETRY_COUNT = 10; // Set in init(). public static MetadataManager INSTANCE; diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index d600f57..07675fe 100644 --- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil; @@ -110,7 +111,7 @@ private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataPrimaryIndexes.METADATA_DATASET_ID); - private IIndexLifecycleManager indexLifecycleManager; + private IDatasetLifecycleManager datasetLifecycleManager; private ITransactionSubsystem transactionSubsystem; public static final MetadataNode INSTANCE = new MetadataNode(); @@ -121,7 +122,7 @@ public void initialize(IAsterixAppRuntimeContext runtimeContext) { this.transactionSubsystem = runtimeContext.getTransactionSubsystem(); - this.indexLifecycleManager = runtimeContext.getIndexLifecycleManager(); + this.datasetLifecycleManager = runtimeContext.getDatasetLifecycleManager(); } @Override @@ -285,9 +286,9 @@ throws Exception { long resourceID = metadataIndex.getResourceID(); String resourceName = metadataIndex.getFile().toString(); - ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceName); + ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName); try { - indexLifecycleManager.open(resourceName); + datasetLifecycleManager.open(resourceName); // prepare a Callback for logging IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, @@ -308,7 +309,7 @@ } catch (Exception e) { throw e; } finally { - indexLifecycleManager.close(resourceName); + datasetLifecycleManager.close(resourceName); } } @@ -635,9 +636,9 @@ throws Exception { long resourceID = metadataIndex.getResourceID(); String resourceName = metadataIndex.getFile().toString(); - ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceName); + ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName); try { - indexLifecycleManager.open(resourceName); + datasetLifecycleManager.open(resourceName); // prepare a Callback for logging IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex, lsmIndex, IndexOperation.DELETE); @@ -655,7 +656,7 @@ } catch (Exception e) { throw e; } finally { - indexLifecycleManager.close(resourceName); + datasetLifecycleManager.close(resourceName); } } @@ -969,8 +970,8 @@ try { IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET; String resourceName = index.getFile().toString(); - IIndex indexInstance = indexLifecycleManager.getIndex(resourceName); - indexLifecycleManager.open(resourceName); + IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName); + datasetLifecycleManager.open(resourceName); IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); @@ -988,11 +989,11 @@ } finally { rangeCursor.close(); } - indexLifecycleManager.close(resourceName); + datasetLifecycleManager.close(resourceName); index = MetadataPrimaryIndexes.DATASET_DATASET; - indexInstance = indexLifecycleManager.getIndex(resourceName); - indexLifecycleManager.open(resourceName); + indexInstance = datasetLifecycleManager.getIndex(resourceName); + datasetLifecycleManager.open(resourceName); indexAccessor = indexInstance .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); @@ -1010,11 +1011,11 @@ } finally { rangeCursor.close(); } - indexLifecycleManager.close(resourceName); + datasetLifecycleManager.close(resourceName); index = MetadataPrimaryIndexes.INDEX_DATASET; - indexInstance = indexLifecycleManager.getIndex(resourceName); - indexLifecycleManager.open(resourceName); + indexInstance = datasetLifecycleManager.getIndex(resourceName); + datasetLifecycleManager.open(resourceName); indexAccessor = indexInstance .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); @@ -1033,7 +1034,7 @@ } finally { rangeCursor.close(); } - indexLifecycleManager.close(resourceName); + datasetLifecycleManager.close(resourceName); } catch (Exception e) { e.printStackTrace(); } @@ -1044,8 +1045,8 @@ IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception { IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory(); String resourceName = index.getFile().toString(); - IIndex indexInstance = indexLifecycleManager.getIndex(resourceName); - indexLifecycleManager.open(resourceName); + IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName); + datasetLifecycleManager.open(resourceName); IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); @@ -1074,7 +1075,7 @@ } finally { rangeCursor.close(); } - indexLifecycleManager.close(resourceName); + datasetLifecycleManager.close(resourceName); } @Override @@ -1082,8 +1083,8 @@ int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID; try { String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString(); - IIndex indexInstance = indexLifecycleManager.getIndex(resourceName); - indexLifecycleManager.open(resourceName); + IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName); + datasetLifecycleManager.open(resourceName); try { IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); @@ -1110,7 +1111,7 @@ rangeCursor.close(); } } finally { - indexLifecycleManager.close(resourceName); + datasetLifecycleManager.close(resourceName); } } catch (Exception e) { diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index c94133a..0605b47 100644 --- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -31,6 +31,7 @@ import java.util.logging.Logger; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.ILocalResourceMetadata; import org.apache.asterix.common.config.AsterixMetadataProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; @@ -38,7 +39,6 @@ import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.IAsterixPropertiesProvider; import org.apache.asterix.common.context.BaseOperationTracker; -import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; import org.apache.asterix.metadata.IDatasetDetails; @@ -76,7 +76,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil; import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree; import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils; @@ -107,7 +106,7 @@ private static IBufferCache bufferCache; private static IFileMapProvider fileMapProvider; - private static IIndexLifecycleManager indexLifecycleManager; + private static IDatasetLifecycleManager dataLifecycleManager; private static ILocalResourceRepository localResourceRepository; private static IIOManager ioManager; @@ -155,7 +154,7 @@ nodeNames = metadataProperties.getNodeNames(); // nodeStores = asterixProperity.getStores(); - indexLifecycleManager = runtimeContext.getIndexLifecycleManager(); + dataLifecycleManager = runtimeContext.getDatasetLifecycleManager(); localResourceRepository = runtimeContext.getLocalResourceRepository(); bufferCache = runtimeContext.getBufferCache(); fileMapProvider = runtimeContext.getFileMapManager(); @@ -391,8 +390,8 @@ LSMBTree lsmBtree = null; long resourceID = -1; ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index - .getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager, - index.getDatasetId().getId(), ((DatasetLifecycleManager) indexLifecycleManager).getDatasetInfo(index + .getDatasetId().getId()) : new BaseOperationTracker(dataLifecycleManager, + index.getDatasetId().getId(), dataLifecycleManager.getDatasetInfo(index .getDatasetId().getId())); final String path = file.getFile().getPath(); if (create) { @@ -406,7 +405,7 @@ bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getMetadataMergePolicyFactory().createMergePolicy( - GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker, + GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager), opTracker, runtimeContext.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null, true); @@ -420,11 +419,11 @@ localResourceMetadata, LocalResource.LSMBTreeResource); ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory(); localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0)); - indexLifecycleManager.register(path, lsmBtree); + dataLifecycleManager.register(path, lsmBtree); } else { final LocalResource resource = localResourceRepository.getResourceByName(path); resourceID = resource.getResourceId(); - lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resource.getResourceName()); + lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(resource.getResourceName()); if (lsmBtree == null) { lsmBtree = LSMBTreeUtils.createLSMTree( virtualBufferCaches, @@ -436,10 +435,10 @@ bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getMetadataMergePolicyFactory().createMergePolicy( - GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker, + GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager), opTracker, runtimeContext.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE .createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null, true); - indexLifecycleManager.register(path, lsmBtree); + dataLifecycleManager.register(path, lsmBtree); } } diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java index b4017fa..122a8ff 100644 --- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java +++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java @@ -21,7 +21,7 @@ import java.nio.ByteBuffer; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; -import org.apache.asterix.common.context.DatasetLifecycleManager; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ILockManager; @@ -73,8 +73,7 @@ try { IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext() .getApplicationContext().getApplicationObject(); - DatasetLifecycleManager datasetLifeCycleManager = (DatasetLifecycleManager) runtimeCtx - .getIndexLifecycleManager(); + IDatasetLifecycleManager datasetLifeCycleManager = runtimeCtx.getDatasetLifecycleManager(); ILockManager lockManager = runtimeCtx.getTransactionSubsystem().getLockManager(); ITransactionManager txnManager = runtimeCtx.getTransactionSubsystem().getTransactionManager(); // get the local transaction diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml index b8673bb..e5ad1b4 100644 --- a/asterix-transactions/pom.xml +++ b/asterix-transactions/pom.xml @@ -107,10 +107,15 @@ <scope>compile</scope> </dependency> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>18.0</version> - </dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>18.0</version> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.10.19</version> + </dependency> </dependencies> </project> diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java index 4c2f25d..58bc44e 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java @@ -55,7 +55,7 @@ ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getIndexLifecycleManager(); + .getDatasetLifecycleManager(); ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName); if (index == null) { throw new HyracksDataException("Index(id:" + resourceId + ") is not registered."); diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java index 1693685..876fcb3 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java @@ -20,7 +20,7 @@ package org.apache.asterix.transaction.management.opcallbacks; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; -import org.apache.asterix.common.context.DatasetLifecycleManager; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider; @@ -37,8 +37,8 @@ @Override public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) { - DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx - .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager(); + IDatasetLifecycleManager dslcManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext() + .getApplicationContext().getApplicationObject()).getDatasetLifecycleManager(); return dslcManager.getOperationTracker(datasetID); } diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java index 9c897f2..48ebff3 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java @@ -51,7 +51,7 @@ Object resource, IHyracksTaskContext ctx) throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getIndexLifecycleManager(); + .getDatasetLifecycleManager(); ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName); if (index == null) { throw new HyracksDataException("Index(id:" + resourceId + ") is not registered."); diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java index 7b17193..7b1ec04 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java @@ -19,8 +19,8 @@ package org.apache.asterix.transaction.management.opcallbacks; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.context.BaseOperationTracker; -import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider; @@ -37,8 +37,8 @@ @Override public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) { - DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx - .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager(); + IDatasetLifecycleManager dslcManager = ((IAsterixAppRuntimeContext) ctx + .getJobletContext().getApplicationContext().getApplicationObject()).getDatasetLifecycleManager(); return new BaseOperationTracker(dslcManager, datasetID, dslcManager.getDatasetInfo(datasetID)); } diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java index 4de0749..23eb2be 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java @@ -52,7 +52,7 @@ Object resource, IHyracksTaskContext ctx) throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getIndexLifecycleManager(); + .getDatasetLifecycleManager(); ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName); if (index == null) { throw new HyracksDataException("Index(id:" + resourceId + ") is not registered."); diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java index c2f56a0..6e0394a 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java @@ -52,7 +52,7 @@ Object resource, IHyracksTaskContext ctx) throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getIndexLifecycleManager(); + .getDatasetLifecycleManager(); ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName); if (index == null) { throw new HyracksDataException("Index(id:" + resourceId + ") is not registered."); diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java index 5851240..26f56b7 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.asterix.common.context.BaseOperationTracker; -import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; @@ -57,10 +56,10 @@ bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(mergePolicyProperties, - runtimeContextProvider.getIndexLifecycleManager()), - new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), - datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()) - .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(), + runtimeContextProvider.getDatasetLifecycleManager()), + new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(), + datasetID, runtimeContextProvider.getDatasetLifecycleManager() + .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), -1, true); return lsmBTree; } diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java index af315f9..beb1bb8 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.asterix.common.context.BaseOperationTracker; -import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; @@ -69,9 +68,9 @@ btreeCmpFactories, runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(mergePolicyProperties, - runtimeContextProvider.getIndexLifecycleManager()), - new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), - datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()) + runtimeContextProvider.getDatasetLifecycleManager()), + new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(), + datasetID, runtimeContextProvider.getDatasetLifecycleManager() .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(), LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), buddyBtreeFields, -1, true); diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java index 576eccd..b9f5af9 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.asterix.common.context.BaseOperationTracker; -import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; @@ -69,10 +68,10 @@ rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(mergePolicyProperties, - runtimeContextProvider.getIndexLifecycleManager()), - new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider - .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider - .getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider + runtimeContextProvider.getDatasetLifecycleManager()), + new BaseOperationTracker(runtimeContextProvider + .getDatasetLifecycleManager(), datasetID, runtimeContextProvider + .getDatasetLifecycleManager().getDatasetInfo(datasetID)), runtimeContextProvider .getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE .createIOOperationCallback(), linearizeCmpFactory, btreeFields, -1, true); } catch (TreeIndexException e) { diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java index 495d56e..0adcf53 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java @@ -76,10 +76,10 @@ bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(mergePolicyProperties, - runtimeContextProvider.getIndexLifecycleManager()), + runtimeContextProvider.getDatasetLifecycleManager()), isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker( - (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID, - ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()) + runtimeContextProvider.getDatasetLifecycleManager(), datasetID, + runtimeContextProvider.getDatasetLifecycleManager() .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), isPrimary, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, true); diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java index d76702e..e635349 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.asterix.common.context.BaseOperationTracker; -import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory; import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; @@ -90,10 +89,10 @@ filePath, runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(mergePolicyProperties, - runtimeContextProvider.getIndexLifecycleManager()), - new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider - .getIndexLifecycleManager(), datasetID, - ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()) + runtimeContextProvider.getDatasetLifecycleManager()), + new BaseOperationTracker(runtimeContextProvider + .getDatasetLifecycleManager(), datasetID, + runtimeContextProvider.getDatasetLifecycleManager() .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(), LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields, @@ -111,10 +110,9 @@ filePath, runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(mergePolicyProperties, - runtimeContextProvider.getIndexLifecycleManager()), - new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider - .getIndexLifecycleManager(), datasetID, - ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()) + runtimeContextProvider.getDatasetLifecycleManager()), + new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(), datasetID, + runtimeContextProvider.getDatasetLifecycleManager() .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(), LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields, diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java index 31a3283..c99e052 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.asterix.common.context.BaseOperationTracker; -import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; @@ -91,10 +90,10 @@ rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(mergePolicyProperties, - runtimeContextProvider.getIndexLifecycleManager()), - new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider - .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider - .getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider + runtimeContextProvider.getDatasetLifecycleManager()), + new BaseOperationTracker(runtimeContextProvider + .getDatasetLifecycleManager(), datasetID, runtimeContextProvider + .getDatasetLifecycleManager().getDatasetInfo(datasetID)), runtimeContextProvider .getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE .createIOOperationCallback(), linearizeCmpFactory, rtreeFields, btreeFields, filterTypeTraits, filterCmpFactories, filterFields, true); diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java index f6880db..65ed01c 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java @@ -18,17 +18,17 @@ */ package org.apache.asterix.transaction.management.service.locking; +import static org.mockito.Mockito.mock; + import java.util.List; import java.util.concurrent.Executors; import org.apache.asterix.common.api.AsterixThreadExecutor; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.storage.am.common.api.IIndex; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; @@ -40,7 +40,7 @@ class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider { AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory()); - IIndexLifecycleManager ilm = new IndexLifecycleManager(); + IDatasetLifecycleManager dlcm = mock(IDatasetLifecycleManager.class); @Override public AsterixThreadExecutor getThreadExecutor() { @@ -63,8 +63,8 @@ } @Override - public IIndexLifecycleManager getIndexLifecycleManager() { - return ilm; + public IDatasetLifecycleManager getDatasetLifecycleManager() { + return dlcm; } @Override @@ -105,42 +105,5 @@ @Override public IAsterixAppRuntimeContext getAppContext() { throw new UnsupportedOperationException(); - } - - static class IndexLifecycleManager implements IIndexLifecycleManager { - @Override - public List<IIndex> getOpenIndexes() { - throw new UnsupportedOperationException(); - } - - @Override - public void register(String resourceName, IIndex index) throws HyracksDataException { - throw new UnsupportedOperationException(); - } - - @Override - public void open(String resourceName) throws HyracksDataException { - throw new UnsupportedOperationException(); - } - - @Override - public void close(String resourceName) throws HyracksDataException { - throw new UnsupportedOperationException(); - } - - @Override - public IIndex getIndex(String resourceName) throws HyracksDataException { - throw new UnsupportedOperationException(); - } - - @Override - public void unregister(String resourceName) throws HyracksDataException { - throw new UnsupportedOperationException(); - } - - @Override - public IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException { - throw new UnsupportedOperationException(); - } } } \ No newline at end of file diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java index fec04e4..9bc4c53 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java @@ -47,8 +47,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.ILocalResourceMetadata; -import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.transactions.DatasetId; @@ -70,7 +70,6 @@ import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndex; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -96,7 +95,7 @@ private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp"; private static final long MEGABYTE = 1024L * 1024L; private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null; - private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //2MB; + private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //4MB; /** * A file at a known location that contains the LSN of the last log record @@ -105,7 +104,7 @@ private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_"; private SystemState state; - public RecoveryManager(TransactionSubsystem txnSubsystem) throws ACIDException { + public RecoveryManager(TransactionSubsystem txnSubsystem) { this.txnSubsystem = txnSubsystem; this.logMgr = (LogManager) txnSubsystem.getLogManager(); this.checkpointHistory = this.txnSubsystem.getTransactionProperties().getCheckpointHistory(); @@ -268,7 +267,9 @@ boolean foundWinner = false; IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider(); - IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager(); + //get datasetLifeCycleManager + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository(); Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository) .loadAndGetAllResources(); @@ -320,17 +321,17 @@ //get index instance from IndexLifeCycleManager //if index is not registered into IndexLifeCycleManager, //create the index using LocalMetadata stored in LocalResourceRepository - index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName()); + index = (ILSMIndex) datasetLifecycleManager.getIndex(localResource.getResourceName()); if (index == null) { //#. create index instance and register to indexLifeCycleManager localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject(); index = localResourceMetadata.createIndexInstance(appRuntimeContext, localResource.getResourceName(), localResource.getPartition()); - indexLifecycleManager.register(localResource.getResourceName(), index); - indexLifecycleManager.open(localResource.getResourceName()); + datasetLifecycleManager.register(localResource.getResourceName(), index); + datasetLifecycleManager.open(localResource.getResourceName()); //#. get maxDiskLastLSN - ILSMIndex lsmIndex = (ILSMIndex) index; + ILSMIndex lsmIndex = index; maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()) .getComponentLSN(lsmIndex.getImmutableComponents()); @@ -341,7 +342,7 @@ } if (LSN > maxDiskLastLsn) { - redo(logRecord); + redo(logRecord, datasetLifecycleManager); redoCount++; } } @@ -361,7 +362,7 @@ //close all indexes Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet(); for (long r : resourceIdList) { - indexLifecycleManager.close(resourcesMap.get(r).getResourceName()); + datasetLifecycleManager.close(resourcesMap.get(r).getResourceName()); } if (LOGGER.isLoggable(Level.INFO)) { @@ -398,8 +399,8 @@ //right after the new checkpoint file is written. File[] prevCheckpointFiles = getPreviousCheckpointFiles(); - DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager) txnSubsystem - .getAsterixAppRuntimeContextProvider().getIndexLifecycleManager(); + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); //flush all in-memory components if it is the sharp checkpoint if (isSharpCheckpoint) { datasetLifecycleManager.flushAllDatasets(); @@ -474,9 +475,9 @@ } public long getMinFirstLSN() throws HyracksDataException { - IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getIndexLifecycleManager(); - List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes(); + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); + List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes(); long firstLSN; //the min first lsn can only be the current append or smaller long minFirstLSN = logMgr.getAppendLSN(); @@ -557,7 +558,7 @@ return parentDir.listFiles(filter); } - private String getCheckpointFileName(String baseDir, String suffix) { + private static String getCheckpointFileName(String baseDir, String suffix) { if (!baseDir.endsWith(System.getProperty("file.separator"))) { baseDir += System.getProperty("file.separator"); } @@ -703,6 +704,8 @@ //undo loserTxn's effect LOGGER.log(Level.INFO, "undoing loser transaction's effect"); + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); //TODO sort loser entities by smallest LSN to undo in one pass. Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator(); int undoCount = 0; @@ -719,7 +722,7 @@ if (IS_DEBUG_MODE) { LOGGER.info(logRecord.getLogRecordForDisplay()); } - undo(logRecord); + undo(logRecord, datasetLifecycleManager); undoCount++; } } @@ -749,10 +752,10 @@ // do nothing } - private void undo(ILogRecord logRecord) { + private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) { try { - ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager() - .getIndex(logRecord.getDatasetId(), logRecord.getResourceId()); + ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), + logRecord.getResourceId()); ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) { @@ -767,10 +770,10 @@ } } - private void redo(ILogRecord logRecord) { + private static void redo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) { try { - ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager() - .getIndex(logRecord.getDatasetId(), logRecord.getResourceId()); + ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), + logRecord.getResourceId()); ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) { @@ -982,7 +985,7 @@ public TxnId() { } - private void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) { + private static void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) { int readOffset = pkValue.getFieldStart(0); byte[] readBuffer = pkValue.getFieldData(0); for (int i = 0; i < pkSize; i++) { @@ -1034,7 +1037,7 @@ } } - private boolean isEqual(byte[] a, byte[] b, int size) { + private static boolean isEqual(byte[] a, byte[] b, int size) { for (int i = 0; i < size; i++) { if (a[i] != b[i]) { return false; @@ -1043,7 +1046,7 @@ return true; } - private boolean isEqual(byte[] a, ITupleReference b, int size) { + private static boolean isEqual(byte[] a, ITupleReference b, int size) { int readOffset = b.getFieldStart(0); byte[] readBuffer = b.getFieldData(0); for (int i = 0; i < size; i++) { @@ -1054,7 +1057,7 @@ return true; } - private boolean isEqual(ITupleReference a, ITupleReference b, int size) { + private static boolean isEqual(ITupleReference a, ITupleReference b, int size) { int aOffset = a.getFieldStart(0); byte[] aBuffer = a.getFieldData(0); int bOffset = b.getFieldStart(0); @@ -1067,7 +1070,7 @@ return true; } - public void serialize(ByteBuffer buffer) throws IOException { + public void serialize(ByteBuffer buffer) { buffer.putInt(jobId); buffer.putInt(datasetId); buffer.putInt(pkHashValue); diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java index 2551ee2..1686e17 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java @@ -19,8 +19,8 @@ package org.apache.asterix.transaction.management.service.transaction; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager; import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; @@ -65,9 +65,9 @@ } @Override - public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) { + public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) { return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getIndexLifecycleManager(); + .getDatasetLifecycleManager(); } @Override diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java index 09fbb06..eeb65e2 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java @@ -56,7 +56,7 @@ this.recoveryManager = new RecoveryManager(this); if (asterixAppRuntimeContextProvider != null) { this.checkpointThread = new CheckpointThread(recoveryManager, - asterixAppRuntimeContextProvider.getIndexLifecycleManager(),logManager, + asterixAppRuntimeContextProvider.getDatasetLifecycleManager(),logManager, this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency()); this.checkpointThread.start(); } else { -- To view, visit https://asterix-gerrit.ics.uci.edu/408 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I4ea1eb129fe3043d43b077473dc29d17a97dfcc2 Gerrit-PatchSet: 16 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Carey <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: Young-Seok Kim <[email protected]>
