abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][STO] Recover from failure in memory allocation callback ......................................................................
[NO ISSUE][STO] Recover from failure in memory allocation callback - user model changes: no - storage format changes: no - interface changes: no details: - Previously, if an exception is thrown in the ILSMIOOperationCallback.allocated call, then the memory component is allocated but the flag memoryComponentsAllocated is false. - Any subsequent attempt to modify the index will try to allocate the component but since it has already been allocated, it will fail with the exception: File is already mapped. - In this change, if an exception is thrown from the callback, then the component is de-allocated before throwing the exception. - Test is case is added. Change-Id: I80e605461df18c7f6d7785cd7504ca3acb4f45b1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2336 Reviewed-by: Murtadha Hubail <[email protected]> Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java 13 files changed, 665 insertions(+), 372 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; ; Verified Murtadha Hubail: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index 9828424..a46b029 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -20,64 +20,38 @@ import java.io.File; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; import java.util.function.Predicate; import org.apache.asterix.app.bootstrap.TestNodeController; import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; -import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter; import org.apache.asterix.app.data.gen.TupleGenerator; -import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.TransactionOptions; import org.apache.asterix.external.util.DataflowUtils; -import org.apache.asterix.file.StorageComponentProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.entities.InternalDatasetDetails; -import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.om.types.IAType; import org.apache.asterix.test.common.TestHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.asterix.test.dataflow.StorageTestUtils.Flusher; +import org.apache.asterix.test.dataflow.StorageTestUtils.Merger; +import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.test.CountAnswer; -import org.apache.hyracks.api.test.FrameWriterTestUtils; -import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; -import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback; import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; -import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -87,46 +61,16 @@ public class ComponentRollbackTest { - private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 }; - private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" }, - new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false); - private static final GenerationFunction[] RECORD_GEN_FUNCTION = - { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC }; - private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false }; - private static final ARecordType META_TYPE = null; - private static final GenerationFunction[] META_GEN_FUNCTION = null; - private static final boolean[] UNIQUE_META_FIELDS = null; - private static final int[] KEY_INDEXES = { 0 }; - private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR }; - private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR }); - private static final int TOTAL_NUM_OF_RECORDS = 10000; - private static final int RECORDS_PER_COMPONENT = 1000; - private static final int DATASET_ID = 101; - private static final String DATAVERSE_NAME = "TestDV"; - private static final String DATASET_NAME = "TestDS"; - private static final String DATA_TYPE_NAME = "DUMMY"; - private static final String NODE_GROUP_NAME = "DEFAULT"; private static final Predicate<ILSMComponent> memoryComponentsPredicate = c -> c instanceof ILSMMemoryComponent; - private static final StorageComponentProvider storageManager = new StorageComponentProvider(); private static TestNodeController nc; private static TestLsmBtree lsmBtree; private static NCAppRuntimeContext ncAppCtx; private static IDatasetLifecycleManager dsLifecycleMgr; - private static Dataset dataset; private static IHyracksTaskContext ctx; private static IIndexDataflowHelper indexDataflowHelper; private static ITransactionContext txnCtx; private static LSMInsertDeleteOperatorNodePushable insertOp; - public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() { - @Override - public void before(Semaphore smeaphore) { - smeaphore.release(); - } - - @Override - public void after() { - } - }; + private static final int PARTITION = 0; @BeforeClass public static void setUp() throws Exception { @@ -149,27 +93,18 @@ @Before public void createIndex() throws Exception { - List<List<String>> partitioningKeys = new ArrayList<>(); - partitioningKeys.add(Collections.singletonList("key")); - int partition = 0; - dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, - NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, - PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null), - null, DatasetType.INTERNAL, DATASET_ID, 0); - PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, - storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, partition); + PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION); IndexDataflowHelperFactory iHelperFactory = new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); JobId jobId = nc.newJobId(); - ctx = nc.createTestContext(jobId, partition, false); - indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); + ctx = nc.createTestContext(jobId, PARTITION, false); + indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION); indexDataflowHelper.open(); lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance(); indexDataflowHelper.close(); txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx), new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); - insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES, - KEY_INDICATORS_LIST, storageManager, null).getLeft(); + insertOp = StorageTestUtils.getInsertPipeline(nc, ctx); } @After @@ -177,26 +112,18 @@ indexDataflowHelper.destroy(); } - static void allowAllOps(TestLsmBtree lsmBtree) { - lsmBtree.addModifyCallback(ALLOW_CALLBACK); - lsmBtree.addFlushCallback(ALLOW_CALLBACK); - lsmBtree.addSearchCallback(ALLOW_CALLBACK); - lsmBtree.addMergeCallback(ALLOW_CALLBACK); - } - @Test public void testRollbackWhileNoOp() { try { // allow all operations - allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) { + for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { // flush every RECORDS_PER_COMPONENT records - if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) { + if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) { if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } @@ -210,28 +137,28 @@ } insertOp.close(); nc.getTransactionManager().commitTransaction(txnCtx.getTxnId()); - // get all components List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents(); List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents(); Assert.assertEquals(9, diskComponents.size()); Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents(memoryComponentsPredicate); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, + StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT); // rollback the last disk component lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - - dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT)); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, + StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT)); } catch (Throwable e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -239,28 +166,21 @@ } public void flush(boolean async) throws Exception { - flush(dsLifecycleMgr, lsmBtree, dataset, async); - } - - public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset, - boolean async) throws Exception { - waitForOperations(lsmBtree); - dsLifecycleMgr.flushDataset(dataset.getDatasetId(), async); + StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, async); } @Test public void testRollbackThenInsert() { try { // allow all operations - allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) { + for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { // flush every RECORDS_PER_COMPONENT records - if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) { + if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) { if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } @@ -279,23 +199,22 @@ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents(); Assert.assertEquals(9, diskComponents.size()); Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - - dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents(memoryComponentsPredicate); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, + StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT); // insert again nc.newJobId(); txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx), new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); - insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES, - KEY_INDICATORS_LIST, storageManager, null).getLeft(); + insertOp = StorageTestUtils.getInsertPipeline(nc, ctx); insertOp.open(); - for (int j = 0; j < RECORDS_PER_COMPONENT; j++) { + for (int j = 0; j < StorageTestUtils.RECORDS_PER_COMPONENT; j++) { ITupleReference tuple = tupleGenerator.next(); DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); } @@ -304,16 +223,16 @@ } insertOp.close(); nc.getTransactionManager().commitTransaction(txnCtx.getTxnId()); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // rollback the last disk component lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - - dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT)); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, + StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT)); } catch (Throwable e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -324,16 +243,15 @@ public void testRollbackWhileSearch() { try { // allow all operations but search - allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); lsmBtree.clearSearchCallbacks(); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) { + for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { // flush every RECORDS_PER_COMPONENT records - if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) { + if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) { if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } @@ -353,42 +271,43 @@ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents(); Assert.assertEquals(9, diskComponents.size()); Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); - Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS); + Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // wait till firstSearcher enter the components firstSearcher.waitUntilEntered(); // now that we enetered, we will rollback ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); - - dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); // rollback a memory component lsmAccessor.deleteComponents( c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified())); // now that the rollback has completed, we will unblock the search - lsmBtree.addSearchCallback(ALLOW_CALLBACK); + lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); lsmBtree.allowSearch(1); Assert.assertTrue(firstSearcher.result()); // search now and ensure - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, + StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT); // rollback the last disk component // re-block searches lsmBtree.clearSearchCallbacks(); - Searcher secondSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, - TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT); + Searcher secondSearcher = new Searcher(nc, PARTITION, lsmBtree, + StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT); // wait till firstSearcher enter the components secondSearcher.waitUntilEntered(); lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata()); - dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn); lsmAccessor.deleteComponents(pred); // now that the rollback has completed, we will unblock the search - lsmBtree.addSearchCallback(ALLOW_CALLBACK); + lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); lsmBtree.allowSearch(1); Assert.assertTrue(secondSearcher.result()); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT)); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, + StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT)); } catch (Throwable e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -399,15 +318,14 @@ public void testRollbackWhileFlush() { try { // allow all operations - allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) { + for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { // flush every RECORDS_PER_COMPONENT records - if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) { + if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) { if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } @@ -426,7 +344,7 @@ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents(); Assert.assertEquals(9, diskComponents.size()); Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // disable flushes lsmBtree.clearFlushCallbacks(); Flusher firstFlusher = new Flusher(lsmBtree); @@ -435,7 +353,7 @@ // now that we enetered, we will rollback. This will not proceed since it is waiting for the flush to complete Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate); // now that the rollback has completed, we will search - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); //unblock the flush lsmBtree.allowFlush(1); // ensure rollback completed @@ -443,7 +361,7 @@ // ensure current mem component is not modified Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); // search now and ensure - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); } catch (Throwable e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -454,16 +372,15 @@ public void testRollbackWhileMerge() { try { // allow all operations but merge - allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); lsmBtree.clearMergeCallbacks(); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) { + for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { // flush every RECORDS_PER_COMPONENT records - if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) { + if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) { if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } @@ -482,7 +399,7 @@ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents(); Assert.assertEquals(9, diskComponents.size()); Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // Now, we will start a full merge Merger merger = new Merger(lsmBtree); ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); @@ -499,7 +416,7 @@ Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn)); // rollback is now waiting for the merge to complete // we will search - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); //unblock the merge lsmBtree.allowMerge(1); // ensure rollback completes @@ -507,8 +424,8 @@ // ensure current mem component is not modified Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); // search now and ensure that we rolled back the merged component - searchAndAssertCount(nc, 0, dataset, storageManager, - TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT)); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS + - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT)); } catch (Throwable e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -519,15 +436,14 @@ public void testRollbackWhileFlushAndSearchFlushExistsFirst() { try { // allow all operations - allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) { + for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { // flush every RECORDS_PER_COMPONENT records - if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) { + if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) { if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } @@ -546,7 +462,7 @@ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents(); Assert.assertEquals(9, diskComponents.size()); Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // disable flushes // disable searches lsmBtree.clearFlushCallbacks(); @@ -554,21 +470,21 @@ Flusher firstFlusher = new Flusher(lsmBtree); flush(true); firstFlusher.waitUntilCount(1); - Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS); + Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // wait till firstSearcher enter the components firstSearcher.waitUntilEntered(); // now that we enetered, we will rollback rollback a memory component Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate); //unblock the flush lsmBtree.allowFlush(1); - lsmBtree.addSearchCallback(ALLOW_CALLBACK); + lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); lsmBtree.allowSearch(1); Assert.assertTrue(firstSearcher.result()); // ensure current mem component is not modified rollerback.complete(); Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); // search now and ensure the rollback was no op since it waits for ongoing flushes - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); } catch (Throwable e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -579,15 +495,14 @@ public void testRollbackWhileFlushAndSearchSearchExistsFirst() { try { // allow all operations - allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) { + for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { // flush every RECORDS_PER_COMPONENT records - if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) { + if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) { if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } @@ -606,7 +521,7 @@ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents(); Assert.assertEquals(9, diskComponents.size()); Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // disable flushes // disable searches lsmBtree.clearFlushCallbacks(); @@ -614,13 +529,13 @@ flush(true); firstFlusher.waitUntilCount(1); lsmBtree.clearSearchCallbacks(); - Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS); + Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // wait till firstSearcher enter the components firstSearcher.waitUntilEntered(); // now that we enetered, we will rollback Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate); // The rollback will be waiting for the flush to complete - lsmBtree.addSearchCallback(ALLOW_CALLBACK); + lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); lsmBtree.allowSearch(1); Assert.assertTrue(firstSearcher.result()); //unblock the flush @@ -629,7 +544,7 @@ rollerback.complete(); Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); // search now and ensure - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); } catch (Throwable e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -640,16 +555,15 @@ public void testRollbackWhileMergeAndSearchMergeExitsFirst() { try { // allow all operations except merge - allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); lsmBtree.clearMergeCallbacks(); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) { + for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { // flush every RECORDS_PER_COMPONENT records - if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) { + if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) { if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } @@ -668,7 +582,7 @@ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents(); Assert.assertEquals(9, diskComponents.size()); Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // Now, we will start a merge Merger merger = new Merger(lsmBtree); ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); @@ -683,7 +597,7 @@ merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); - Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS); + Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // wait till firstSearcher enter the components firstSearcher.waitUntilEntered(); // now that we enetered, we will rollback @@ -692,13 +606,13 @@ // unblock the merge lsmBtree.allowMerge(1); // unblock the search - lsmBtree.addSearchCallback(ALLOW_CALLBACK); + lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); lsmBtree.allowSearch(1); Assert.assertTrue(firstSearcher.result()); rollerback.complete(); // now that the rollback has completed, we will search - searchAndAssertCount(nc, 0, dataset, storageManager, - TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT)); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS + - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT)); // ensure current mem component is not modified Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); } catch (Throwable e) { @@ -711,16 +625,15 @@ public void testRollbackWhileMergeAndSearchSearchExitsFirst() { try { // allow all operations except merge - allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); lsmBtree.clearMergeCallbacks(); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) { + for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { // flush every RECORDS_PER_COMPONENT records - if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) { + if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) { if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } @@ -739,7 +652,7 @@ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents(); Assert.assertEquals(9, diskComponents.size()); Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // Now, we will start a merge Merger merger = new Merger(lsmBtree); ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); @@ -754,22 +667,22 @@ merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); - Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS); + Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS); // wait till firstSearcher enter the components firstSearcher.waitUntilEntered(); // now that we enetered, we will rollback Rollerback rollerBack = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn)); // unblock the search - lsmBtree.addSearchCallback(ALLOW_CALLBACK); + lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); lsmBtree.allowSearch(1); Assert.assertTrue(firstSearcher.result()); // even though rollback has been called, it is still waiting for the merge to complete - searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS); //unblock the merge lsmBtree.allowMerge(1); rollerBack.complete(); - searchAndAssertCount(nc, 0, dataset, storageManager, - TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT)); + StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS + - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT)); // ensure current mem component is not modified Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified()); } catch (Throwable e) { @@ -789,7 +702,7 @@ public void run() { ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); try { - dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh(); + dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh(); ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0); lsmAccessor.deleteComponents(predicate); } catch (HyracksDataException e) { @@ -805,102 +718,6 @@ task.join(); if (failure != null) { throw failure; - } - } - } - - static class Searcher { - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - private Future<Boolean> task; - private volatile boolean entered = false; - - public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager, - TestLsmBtree lsmBtree, int numOfRecords) { - lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() { - - @Override - public void before(Semaphore sem) { - synchronized (Searcher.this) { - entered = true; - Searcher.this.notifyAll(); - } - } - - @Override - public void after() { - } - }); - Callable<Boolean> callable = new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - searchAndAssertCount(nc, partition, dataset, storageManager, numOfRecords); - return true; - } - }; - task = executor.submit(callable); - } - - boolean result() throws Exception { - return task.get(); - } - - synchronized void waitUntilEntered() throws InterruptedException { - while (!entered) { - this.wait(); - } - } - } - - private class Merger { - private volatile int count = 0; - - public Merger(TestLsmBtree lsmBtree) { - lsmBtree.addMergeCallback(new ITestOpCallback<Semaphore>() { - - @Override - public void before(Semaphore smeaphore) { - synchronized (Merger.this) { - count++; - Merger.this.notifyAll(); - } - } - - @Override - public void after() { - } - }); - } - - synchronized void waitUntilCount(int count) throws InterruptedException { - while (this.count != count) { - this.wait(); - } - } - } - - private class Flusher { - private volatile int count = 0; - - public Flusher(TestLsmBtree lsmBtree) { - lsmBtree.addFlushCallback(new ITestOpCallback<Semaphore>() { - - @Override - public void before(Semaphore smeaphore) { - synchronized (Flusher.this) { - count++; - Flusher.this.notifyAll(); - } - } - - @Override - public void after() { - } - }); - } - - synchronized void waitUntilCount(int count) throws InterruptedException { - while (this.count != count) { - this.wait(); } } } @@ -923,50 +740,5 @@ return false; } } - } - - static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset, - StorageComponentProvider storageManager, int numOfRecords) - throws HyracksDataException, AlgebricksException { - JobId jobId = nc.newJobId(); - IHyracksTaskContext ctx = nc.createTestContext(jobId, partition, false); - TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE), - Collections.emptyList(), Collections.emptyList(), false); - IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, - new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager); - emptyTupleOp.open(); - emptyTupleOp.close(); - Assert.assertEquals(numOfRecords, countOp.getCount()); - } - - public static void waitForOperations(ILSMIndex index) throws InterruptedException { - // wait until number of activeOperation reaches 0 - PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker(); - long maxWaitTime = 60000L; // 1 minute - long before = System.currentTimeMillis(); - while (opTracker.getNumActiveOperations() > 0) { - Thread.sleep(5); // NOSONAR: Test code with a timeout - if (System.currentTimeMillis() - before > maxWaitTime) { - throw new IllegalStateException( - (System.currentTimeMillis() - before) + "ms passed without completing the frame operation"); - } - } - } - - public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor, - Collection<FrameWriterOperation> exceptionThrowingOperations, - Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) { - CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open, - exceptionThrowingOperations, errorThrowingOperations); - CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame, - exceptionThrowingOperations, errorThrowingOperations); - CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush, - exceptionThrowingOperations, errorThrowingOperations); - CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail, - exceptionThrowingOperations, errorThrowingOperations); - CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close, - exceptionThrowingOperations, errorThrowingOperations); - return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer, - closeAnswer, deepCopyInputFrames); } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java new file mode 100644 index 0000000..8bafd32 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.io.File; + +import org.apache.asterix.app.bootstrap.TestNodeController; +import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; +import org.apache.asterix.app.data.gen.TupleGenerator; +import org.apache.asterix.app.nc.NCAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.TransactionOptions; +import org.apache.asterix.external.util.DataflowUtils; +import org.apache.asterix.test.common.TestHelper; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback; +import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class IoCallbackFailureTest { + + private static final int PARTITION = 0; + private static TestNodeController nc; + + @BeforeClass + public static void setUp() throws Exception { + System.out.println("SetUp: "); + TestHelper.deleteExistingInstanceFiles(); + String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test" + + File.separator + "resources" + File.separator + "cc.conf"; + nc = new TestNodeController(configPath, false); + nc.init(); + } + + @AfterClass + public static void tearDown() throws Exception { + System.out.println("TearDown"); + nc.deInit(); + TestHelper.deleteExistingInstanceFiles(); + } + + @Test + public void testTempFailureInAllocateCallback() throws Exception { + PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION); + IndexDataflowHelperFactory iHelperFactory = + new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); + JobId jobId = nc.newJobId(); + IHyracksTaskContext ctx = nc.createTestContext(jobId, PARTITION, false); + IIndexDataflowHelper indexDataflowHelper = + iHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION); + indexDataflowHelper.open(); + TestLsmBtree lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance(); + indexDataflowHelper.close(); + LSMInsertDeleteOperatorNodePushable insertOp = StorageTestUtils.getInsertPipeline(nc, ctx); + StorageTestUtils.allowAllOps(lsmBtree); + ITestOpCallback<ILSMMemoryComponent> failCallback = new ITestOpCallback<ILSMMemoryComponent>() { + @SuppressWarnings("deprecation") + @Override + public void before(ILSMMemoryComponent c) throws HyracksDataException { + throw new HyracksDataException("Fail on allocate callback"); + } + + @Override + public void after() throws HyracksDataException { + // No Op + } + }; + lsmBtree.addIoAllocateCallback(failCallback); + boolean expectedExceptionThrown = false; + try { + insert(nc, lsmBtree, ctx, insertOp, StorageTestUtils.TOTAL_NUM_OF_RECORDS, + StorageTestUtils.RECORDS_PER_COMPONENT); + } catch (Exception e) { + expectedExceptionThrown = true; + } + Assert.assertTrue(expectedExceptionThrown); + // Clear the callback and retry + lsmBtree.clearIoAllocateCallback(); + jobId = nc.newJobId(); + ctx = nc.createTestContext(jobId, PARTITION, false); + insertOp = StorageTestUtils.getInsertPipeline(nc, ctx); + insert(nc, lsmBtree, ctx, insertOp, StorageTestUtils.TOTAL_NUM_OF_RECORDS, + StorageTestUtils.RECORDS_PER_COMPONENT); + } + + private static void insert(TestNodeController nc, TestLsmBtree lsmBtree, IHyracksTaskContext ctx, + LSMInsertDeleteOperatorNodePushable insertOp, int totalNumRecords, int recordsPerComponent) + throws Exception { + NCAppRuntimeContext ncAppCtx = nc.getAppRuntimeContext(); + IDatasetLifecycleManager dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager(); + TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx), + new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); + boolean failed = false; + try { + try { + insertOp.open(); + VSizeFrame frame = new VSizeFrame(ctx); + FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); + for (int j = 0; j < totalNumRecords; j++) { + // flush every recordsPerComponent records + if (j % recordsPerComponent == 0 && j + 1 != totalNumRecords) { + if (tupleAppender.getTupleCount() > 0) { + tupleAppender.write(insertOp, true); + } + StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, false); + } + ITupleReference tuple = tupleGenerator.next(); + DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); + } + if (tupleAppender.getTupleCount() > 0) { + tupleAppender.write(insertOp, true); + } + } catch (Throwable th) { + failed = true; + insertOp.fail(); + throw th; + } finally { + insertOp.close(); + } + } finally { + if (failed) { + nc.getTransactionManager().abortTransaction(txnCtx.getTxnId()); + } else { + nc.getTransactionManager().commitTransaction(txnCtx.getTxnId()); + } + } + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java index b39a5c6..367d0b9 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java @@ -189,8 +189,8 @@ } // allow all operations for (int i = 0; i < NUM_PARTITIONS; i++) { - ComponentRollbackTest.allowAllOps(primaryLsmBtrees[i]); - ComponentRollbackTest.allowAllOps(secondaryLsmBtrees[i]); + StorageTestUtils.allowAllOps(primaryLsmBtrees[i]); + StorageTestUtils.allowAllOps(secondaryLsmBtrees[i]); actors[i].add(new Request(Request.Action.INSERT_OPEN)); } } @@ -224,9 +224,9 @@ } ensureDone(actors[0]); // search now and ensure partition 0 has all the records - ComponentRollbackTest.searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); + StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS); // and that partition 1 has no records - ComponentRollbackTest.searchAndAssertCount(nc, 1, dataset, storageManager, 0); + StorageTestUtils.searchAndAssertCount(nc, 1, dataset, storageManager, 0); // and that partition 0 has numFlushes disk components Assert.assertEquals(totalNumOfComponents, primaryLsmBtrees[0].getDiskComponents().size()); // and that partition 1 has no disk components @@ -655,7 +655,7 @@ if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOps[partition], true); } - ComponentRollbackTest.waitForOperations(primaryLsmBtrees[partition]); + StorageTestUtils.waitForOperations(primaryLsmBtrees[partition]); break; default: break; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java index 7bc7a88..c452548 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java @@ -45,7 +45,7 @@ import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.test.common.TestHelper; -import org.apache.asterix.test.dataflow.ComponentRollbackTest.Searcher; +import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.api.comm.VSizeFrame; @@ -147,7 +147,7 @@ } void unblockSearch(TestLsmBtree lsmBtree) { - lsmBtree.addSearchCallback(ComponentRollbackTest.ALLOW_CALLBACK); + lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK); lsmBtree.allowSearch(1); } @@ -155,7 +155,7 @@ public void testCursorSwitchSucceed() { try { // allow all operations - ComponentRollbackTest.allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); // except search lsmBtree.clearSearchCallbacks(); insertOp.open(); @@ -170,7 +170,7 @@ if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } - ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false); + StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false); } ITupleReference tuple = tupleGenerator.next(); DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); @@ -183,7 +183,7 @@ firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS); // wait till firstSearcher enter the components firstSearcher.waitUntilEntered(); - ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false); + StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false); nc.getTransactionManager().commitTransaction(txnCtx.getTxnId()); // unblock the search unblockSearch(lsmBtree); @@ -201,7 +201,7 @@ public void testCursorSwitchFails() { try { // allow all operations - ComponentRollbackTest.allowAllOps(lsmBtree); + StorageTestUtils.allowAllOps(lsmBtree); // except search lsmBtree.clearSearchCallbacks(); insertOp.open(); @@ -216,7 +216,7 @@ if (tupleAppender.getTupleCount() > 0) { tupleAppender.write(insertOp, true); } - ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false); + StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false); } ITupleReference tuple = tupleGenerator.next(); DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); @@ -229,7 +229,7 @@ firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS); // wait till firstSearcher enter the components firstSearcher.waitUntilEntered(); - ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false); + StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false); nc.getTransactionManager().commitTransaction(txnCtx.getTxnId()); // merge all components ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE); @@ -255,7 +255,7 @@ throws HyracksDataException, AlgebricksException { nc.newJobId(); TestTupleCounterFrameWriter countOp = - ComponentRollbackTest.create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE), + StorageTestUtils.create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE), Collections.emptyList(), Collections.emptyList(), false); IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java new file mode 100644 index 0000000..e7a455c --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; + +import org.apache.asterix.app.bootstrap.TestNodeController; +import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; +import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter; +import org.apache.asterix.app.data.gen.TupleGenerator; +import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.context.PrimaryIndexOperationTracker; +import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.file.StorageComponentProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.test.CountAnswer; +import org.apache.hyracks.api.test.FrameWriterTestUtils; +import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation; +import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback; +import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; +import org.junit.Assert; + +public class StorageTestUtils { + + public static final IAType[] KEY_TYPES = { BuiltinType.AINT32 }; + public static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" }, + new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false); + public static final GenerationFunction[] RECORD_GEN_FUNCTION = + { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC }; + public static final boolean[] UNIQUE_RECORD_FIELDS = { true, false }; + public static final ARecordType META_TYPE = null; + public static final GenerationFunction[] META_GEN_FUNCTION = null; + public static final boolean[] UNIQUE_META_FIELDS = null; + public static final int[] KEY_INDEXES = { 0 }; + public static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR }; + public static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR }); + public static final int TOTAL_NUM_OF_RECORDS = 10000; + public static final int RECORDS_PER_COMPONENT = 1000; + public static final int DATASET_ID = 101; + public static final String DATAVERSE_NAME = "TestDV"; + public static final String DATASET_NAME = "TestDS"; + public static final String DATA_TYPE_NAME = "DUMMY"; + public static final String NODE_GROUP_NAME = "DEFAULT"; + public static final StorageComponentProvider STORAGE_MANAGER = new StorageComponentProvider(); + public static final List<List<String>> PARTITIONING_KEYS = + new ArrayList<>(Collections.singletonList(Collections.singletonList(RECORD_TYPE.getFieldNames()[0]))); + public static final TestDataset DATASET = + new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, + NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, + PARTITIONING_KEYS, null, null, null, false, null), + null, DatasetType.INTERNAL, DATASET_ID, 0); + public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() { + @Override + public void before(Semaphore smeaphore) { + smeaphore.release(); + } + + @Override + public void after() { + } + }; + + private StorageTestUtils() { + } + + static void allowAllOps(TestLsmBtree lsmBtree) { + lsmBtree.addModifyCallback(ALLOW_CALLBACK); + lsmBtree.addFlushCallback(ALLOW_CALLBACK); + lsmBtree.addSearchCallback(ALLOW_CALLBACK); + lsmBtree.addMergeCallback(ALLOW_CALLBACK); + } + + public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, int partition) + throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { + return nc.createPrimaryIndex(DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, STORAGE_MANAGER, KEY_INDEXES, + KEY_INDICATORS_LIST, partition); + } + + public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx) + throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { + return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES, + KEY_INDICATORS_LIST, STORAGE_MANAGER, null).getLeft(); + } + + public static TupleGenerator getTupleGenerator() { + return new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION, + UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + } + + public static void searchAndAssertCount(TestNodeController nc, int partition, int numOfRecords) + throws HyracksDataException, AlgebricksException { + searchAndAssertCount(nc, partition, DATASET, STORAGE_MANAGER, numOfRecords); + } + + public static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset, + StorageComponentProvider storageManager, int numOfRecords) + throws HyracksDataException, AlgebricksException { + JobId jobId = nc.newJobId(); + IHyracksTaskContext ctx = nc.createTestContext(jobId, partition, false); + TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE), + Collections.emptyList(), Collections.emptyList(), false); + IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, + new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager); + emptyTupleOp.open(); + emptyTupleOp.close(); + Assert.assertEquals(numOfRecords, countOp.getCount()); + } + + public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor, + Collection<FrameWriterOperation> exceptionThrowingOperations, + Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) { + CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open, + exceptionThrowingOperations, errorThrowingOperations); + CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame, + exceptionThrowingOperations, errorThrowingOperations); + CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush, + exceptionThrowingOperations, errorThrowingOperations); + CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail, + exceptionThrowingOperations, errorThrowingOperations); + CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close, + exceptionThrowingOperations, errorThrowingOperations); + return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer, + closeAnswer, deepCopyInputFrames); + } + + public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, boolean async) + throws Exception { + flush(dsLifecycleMgr, lsmBtree, DATASET, async); + } + + public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset, + boolean async) throws Exception { + waitForOperations(lsmBtree); + dsLifecycleMgr.flushDataset(dataset.getDatasetId(), async); + } + + public static void waitForOperations(ILSMIndex index) throws InterruptedException { + // wait until number of activeOperation reaches 0 + PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker(); + long maxWaitTime = 60000L; // 1 minute + long before = System.currentTimeMillis(); + while (opTracker.getNumActiveOperations() > 0) { + Thread.sleep(5); // NOSONAR: Test code with a timeout + if (System.currentTimeMillis() - before > maxWaitTime) { + throw new IllegalStateException( + (System.currentTimeMillis() - before) + "ms passed without completing the frame operation"); + } + } + } + + public static class Searcher { + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private Future<Boolean> task; + private volatile boolean entered = false; + + public Searcher(TestNodeController nc, int partition, TestLsmBtree lsmBtree, int numOfRecords) { + this(nc, partition, DATASET, STORAGE_MANAGER, lsmBtree, numOfRecords); + } + + public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager, + TestLsmBtree lsmBtree, int numOfRecords) { + lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() { + + @Override + public void before(Semaphore sem) { + synchronized (Searcher.this) { + entered = true; + Searcher.this.notifyAll(); + } + } + + @Override + public void after() { + } + }); + Callable<Boolean> callable = new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + searchAndAssertCount(nc, partition, dataset, storageManager, numOfRecords); + return true; + } + }; + task = executor.submit(callable); + } + + public boolean result() throws Exception { + return task.get(); + } + + public synchronized void waitUntilEntered() throws InterruptedException { + while (!entered) { + this.wait(); + } + } + } + + public static class Merger { + private volatile int count = 0; + + public Merger(TestLsmBtree lsmBtree) { + lsmBtree.addMergeCallback(new ITestOpCallback<Semaphore>() { + + @Override + public void before(Semaphore smeaphore) { + synchronized (Merger.this) { + count++; + Merger.this.notifyAll(); + } + } + + @Override + public void after() { + } + }); + } + + public synchronized void waitUntilCount(int count) throws InterruptedException { + while (this.count != count) { + this.wait(); + } + } + } + + public static class Flusher { + private volatile int count = 0; + + public Flusher(TestLsmBtree lsmBtree) { + lsmBtree.addFlushCallback(new ITestOpCallback<Semaphore>() { + + @Override + public void before(Semaphore smeaphore) { + synchronized (Flusher.this) { + count++; + Flusher.this.notifyAll(); + } + } + + @Override + public void after() { + } + }); + } + + public synchronized void waitUntilCount(int count) throws InterruptedException { + while (this.count != count) { + this.wait(); + } + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java index 453431d..905c99d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java @@ -81,12 +81,28 @@ throw HyracksDataException.create(ErrorCode.CANNOT_CREATE_ACTIVE_INDEX); } fileId = bufferCache.createFile(file); - bufferCache.openFile(fileId); - freePageManager.open(fileId); - freePageManager.init(interiorFrameFactory, leafFrameFactory); - setRootPage(); - freePageManager.close(); - bufferCache.closeFile(fileId); + boolean failed = true; + try { + bufferCache.openFile(fileId); + failed = false; + } finally { + if (failed) { + bufferCache.deleteFile(fileId); + } + } + failed = true; + try { + freePageManager.open(fileId); + freePageManager.init(interiorFrameFactory, leafFrameFactory); + setRootPage(); + freePageManager.close(); + failed = false; + } finally { + bufferCache.closeFile(fileId); + if (failed) { + bufferCache.deleteFile(fileId); + } + } } private void setRootPage() throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java index f892585..c72d402 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java @@ -76,7 +76,8 @@ void setState(ComponentState state); /** - * Allocates memory to this component, create and activate it + * Allocates memory to this component, create and activate it. + * This method is atomic. If an exception is thrown, then the call had no effect. * * @throws HyracksDataException */ diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index e9f410d..749b3ba 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -438,9 +438,29 @@ if (memoryComponentsAllocated || memoryComponents == null) { return; } - for (ILSMMemoryComponent c : memoryComponents) { - c.allocate(); - ioOpCallback.allocated(c); + int i = 0; + boolean allocated = false; + try { + for (; i < memoryComponents.size(); i++) { + allocated = false; + ILSMMemoryComponent c = memoryComponents.get(i); + c.allocate(); + allocated = true; + ioOpCallback.allocated(c); + } + } finally { + if (i < memoryComponents.size()) { + // something went wrong + if (allocated) { + ILSMMemoryComponent c = memoryComponents.get(i); + c.deallocate(); + } + // deallocate all previous components + for (int j = i - 1; j >= 0; j--) { + ILSMMemoryComponent c = memoryComponents.get(j); + c.deallocate(); + } + } } memoryComponentsAllocated = true; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java index c0bef7d..3fbef18 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java @@ -237,19 +237,40 @@ @Override public final void allocate() throws HyracksDataException { + boolean allocated = false; ((IVirtualBufferCache) getIndex().getBufferCache()).open(); - doAllocate(); + try { + doAllocate(); + allocated = true; + } finally { + if (!allocated) { + ((IVirtualBufferCache) getIndex().getBufferCache()).close(); + } + } } protected void doAllocate() throws HyracksDataException { - getIndex().create(); - getIndex().activate(); + boolean created = false; + boolean activated = false; + try { + getIndex().create(); + created = true; + getIndex().activate(); + activated = true; + } finally { + if (created && !activated) { + getIndex().destroy(); + } + } } @Override public final void deallocate() throws HyracksDataException { - doDeallocate(); - getIndex().getBufferCache().close(); + try { + doDeallocate(); + } finally { + getIndex().getBufferCache().close(); + } } protected void doDeallocate() throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java index 7a3d58b..9b25471 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java @@ -108,7 +108,15 @@ public synchronized void open() throws HyracksDataException { ++openCount; if (openCount == 1) { - vbc.open(); + boolean failed = true; + try { + vbc.open(); + failed = false; + } finally { + if (failed) { + openCount--; + } + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java index 816550b..19b4856 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java @@ -34,6 +34,7 @@ * Initializes the persistent state of an index. * An index cannot be created if it is in the activated state. * Calling create on an index that is deactivated has the effect of clearing the index. + * This method is atomic. If an exception is thrown, then the call had no effect. * * @throws HyracksDataException * if there is an error in the BufferCache while (un)pinning pages, (un)latching pages, diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java index acc3347..e888238 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java @@ -18,8 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impl; -public interface ITestOpCallback<T> { - void before(T t); +import org.apache.hyracks.api.exceptions.HyracksDataException; - void after(); +public interface ITestOpCallback<T> { + void before(T t) throws HyracksDataException; + + void after() throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java index bf3bb31..3c781a6 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java @@ -109,7 +109,7 @@ } } - public static <T> void callback(ITestOpCallback<T> callback, T t) { + public static <T> void callback(ITestOpCallback<T> callback, T t) throws HyracksDataException { if (callback != null) { callback.before(t); } @@ -344,7 +344,7 @@ } } - public void beforeIoOperationCalled() { + public void beforeIoOperationCalled() throws HyracksDataException { synchronized (ioBeforeCallbacks) { for (ITestOpCallback<Void> callback : ioBeforeCallbacks) { callback.before(null); @@ -352,7 +352,7 @@ } } - public void beforeIoOperationReturned() { + public void beforeIoOperationReturned() throws HyracksDataException { synchronized (ioBeforeCallbacks) { for (ITestOpCallback<Void> callback : ioBeforeCallbacks) { callback.after(); @@ -360,7 +360,7 @@ } } - public void afterIoOperationCalled() { + public void afterIoOperationCalled() throws HyracksDataException { synchronized (ioAfterOpCallbacks) { for (ITestOpCallback<Void> callback : ioAfterOpCallbacks) { callback.before(null); @@ -368,7 +368,7 @@ } } - public void afterIoOperationReturned() { + public void afterIoOperationReturned() throws HyracksDataException { synchronized (ioAfterOpCallbacks) { for (ITestOpCallback<Void> callback : ioAfterOpCallbacks) { callback.after(); @@ -376,7 +376,7 @@ } } - public void afterIoFinalizeCalled() { + public void afterIoFinalizeCalled() throws HyracksDataException { synchronized (ioAfterFinalizeCallbacks) { for (ITestOpCallback<Void> callback : ioAfterFinalizeCallbacks) { callback.before(null); @@ -384,7 +384,7 @@ } } - public void afterIoFinalizeReturned() { + public void afterIoFinalizeReturned() throws HyracksDataException { synchronized (ioAfterFinalizeCallbacks) { for (ITestOpCallback<Void> callback : ioAfterFinalizeCallbacks) { callback.after(); @@ -392,7 +392,7 @@ } } - public void recycledCalled(ILSMMemoryComponent component) { + public void recycledCalled(ILSMMemoryComponent component) throws HyracksDataException { synchronized (ioRecycleCallbacks) { for (ITestOpCallback<ILSMMemoryComponent> callback : ioRecycleCallbacks) { callback.before(component); @@ -400,7 +400,7 @@ } } - public void recycledReturned(ILSMMemoryComponent component) { + public void recycledReturned(ILSMMemoryComponent component) throws HyracksDataException { synchronized (ioRecycleCallbacks) { for (ITestOpCallback<ILSMMemoryComponent> callback : ioRecycleCallbacks) { callback.after(); @@ -408,7 +408,7 @@ } } - public void allocatedCalled(ILSMMemoryComponent component) { + public void allocatedCalled(ILSMMemoryComponent component) throws HyracksDataException { synchronized (ioAllocateCallbacks) { for (ITestOpCallback<ILSMMemoryComponent> callback : ioAllocateCallbacks) { callback.before(component); @@ -416,7 +416,7 @@ } } - public void allocatedReturned(ILSMMemoryComponent component) { + public void allocatedReturned(ILSMMemoryComponent component) throws HyracksDataException { synchronized (ioAllocateCallbacks) { for (ITestOpCallback<ILSMMemoryComponent> callback : ioAllocateCallbacks) { callback.after(); -- To view, visit https://asterix-gerrit.ics.uci.edu/2336 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I80e605461df18c7f6d7785cd7504ca3acb4f45b1 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
