Repository: asterixdb Updated Branches: refs/heads/master 4729fdbd4 -> 34d75c418
Add Checkpoint Test This change adds a unit test case which validates that checkpoints do not delete log files that are still required for recovery, and delete those that are no longer needed. Change-Id: I4cb4743fe488deb5ad10f65604adc2231948795e Reviewed-on: https://asterix-gerrit.ics.uci.edu/1270 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Ian Maxon <ima...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/34d75c41 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/34d75c41 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/34d75c41 Branch: refs/heads/master Commit: 34d75c418d2e266ef321b2dc7dbfddd1459cbf87 Parents: 4729fdb Author: Murtadha Hubail <mhub...@uci.edu> Authored: Sun Oct 23 19:42:23 2016 +0300 Committer: Murtadha Hubail <hubail...@gmail.com> Committed: Tue Oct 25 11:20:43 2016 -0700 ---------------------------------------------------------------------- .../app/bootstrap/TestNodeController.java | 25 ++- .../apache/asterix/test/common/TestHelper.java | 57 +++++- .../asterix/test/dataflow/LogMarkerTest.java | 35 +--- .../asterix/test/logging/CheckpointingTest.java | 196 +++++++++++++++++++ .../config/AsterixTransactionProperties.java | 10 +- .../management/service/logging/LogManager.java | 2 +- 6 files changed, 278 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34d75c41/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index a806532..88ca736 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -32,11 +32,8 @@ import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.context.TransactionSubsystemProvider; import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable; import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; -import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider; import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider; import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider; @@ -46,6 +43,7 @@ import org.apache.asterix.metadata.utils.DatasetUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.formats.NonTaggedDataFormat; +import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider; import org.apache.asterix.test.runtime.ExecutionTestUtil; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider; @@ -67,7 +65,6 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.util.HyracksConstants; @@ -115,8 +112,12 @@ public class TestNodeController { private JobId jobId; private long jobCounter = 0L; private IHyracksJobletContext jobletCtx; + private final String testConfigFileName; + private final boolean runHDFS; - public TestNodeController() throws AsterixException, HyracksException, ACIDException { + public TestNodeController(String testConfigFileName, boolean runHDFS) { + this.testConfigFileName = testConfigFileName; + this.runHDFS = runHDFS; } public void init() throws Exception { @@ -125,7 +126,9 @@ public class TestNodeController { outdir.mkdirs(); // remove library directory TestLibrarian.removeLibraryDir(); - ExecutionTestUtil.setUp(cleanupOnStart); + ExecutionTestUtil.setUp(cleanupOnStart, + testConfigFileName == null ? TEST_CONFIG_FILE_NAME : testConfigFileName, + ExecutionTestUtil.integrationUtil, runHDFS); } catch (Throwable th) { th.printStackTrace(); throw th; @@ -299,7 +302,7 @@ public class TestNodeController { PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterFields); TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo); - return getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc); + return getPrimaryIndexDataflowHelper(createTestContext(true), primaryIndexInfo, indexOpDesc); } public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, @@ -308,7 +311,7 @@ public class TestNodeController { PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterFields); TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo); - LSMBTreeDataflowHelper dataflowHelper = getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, + LSMBTreeDataflowHelper dataflowHelper = getPrimaryIndexDataflowHelper(createTestContext(true), primaryIndexInfo, indexOpDesc); dataflowHelper.create(); } @@ -359,9 +362,11 @@ public class TestNodeController { return primaryIndexTypeTraits; } - public IHyracksTaskContext createTestContext() throws HyracksDataException { + public IHyracksTaskContext createTestContext(boolean withMessaging) throws HyracksDataException { IHyracksTaskContext ctx = TestUtils.create(KB32); - TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx); + if (withMessaging) { + TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx); + } ctx = Mockito.spy(ctx); Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx); Mockito.when(ctx.getIOManager()) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34d75c41/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java index 5661258..c1399fb 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java @@ -19,6 +19,7 @@ package org.apache.asterix.test.common; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -28,11 +29,22 @@ import java.util.List; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; + +import org.apache.asterix.common.configuration.AsterixConfiguration; +import org.apache.asterix.common.exceptions.AsterixException; import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; public final class TestHelper { + private static final String TEST_DIR_BASE_PATH = System.getProperty("user.dir") + File.separator + "target"; + private static final String[] TEST_DIRS = new String[] { "txnLogDir", "IODevice", "spill_area", "config" }; + public static boolean isInPrefixList(List<String> prefixList, String s) { for (String s2 : prefixList) { if (s.startsWith(s2)) { @@ -52,11 +64,11 @@ public final class TestHelper { Enumeration<? extends ZipEntry> entries = zipFile.entries(); while (entries.hasMoreElements()) { ZipEntry entry = entries.nextElement(); - File entryDestination = new File(outputDir, entry.getName()); + File entryDestination = new File(outputDir, entry.getName()); if (!entry.isDirectory()) { entryDestination.getParentFile().mkdirs(); try (InputStream in = zipFile.getInputStream(entry); - OutputStream out = new FileOutputStream(entryDestination)) { + OutputStream out = new FileOutputStream(entryDestination)) { IOUtils.copy(in, out); } } @@ -72,4 +84,43 @@ public final class TestHelper { } } } -} + + public static AsterixConfiguration getConfigurations(String fileName) + throws IOException, JAXBException, AsterixException { + try (InputStream is = TestHelper.class.getClassLoader().getResourceAsStream(fileName)) { + if (is != null) { + JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class); + Unmarshaller unmarshaller = ctx.createUnmarshaller(); + return (AsterixConfiguration) unmarshaller.unmarshal(is); + } else { + throw new AsterixException("Could not find configuration file " + fileName); + } + } + } + + public static void writeConfigurations(AsterixConfiguration ac, String fileName) + throws FileNotFoundException, IOException, JAXBException { + File configFile = new File(fileName); + if (!configFile.exists()) { + configFile.getParentFile().mkdirs(); + configFile.createNewFile(); + } else { + configFile.delete(); + } + try (FileOutputStream os = new FileOutputStream(fileName)) { + JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class); + Marshaller marshaller = ctx.createMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); + marshaller.marshal(ac, os); + } + } + + public static void deleteExistingInstanceFiles() { + for (String dirName : TEST_DIRS) { + File f = new File(joinPath(TEST_DIR_BASE_PATH, dirName)); + if (FileUtils.deleteQuietly(f)) { + System.out.println("Dir " + f.getName() + " deleted"); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34d75c41/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java index a0ef31e..f41666d 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.test.dataflow; -import java.io.File; import java.util.Collection; import java.util.Collections; @@ -27,7 +26,6 @@ import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter; import org.apache.asterix.app.data.gen.TupleGenerator; import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ILogRecord; @@ -39,8 +37,8 @@ import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningS import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; +import org.apache.asterix.test.common.TestHelper; import org.apache.asterix.transaction.management.service.logging.LogReader; -import org.apache.commons.io.FileUtils; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -63,12 +61,11 @@ import org.junit.Test; public class LogMarkerTest { - private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 }; private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" }, new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false); - private static final GenerationFunction[] RECORD_GEN_FUNCTION = - { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC }; + private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC, + GenerationFunction.DETERMINISTIC }; private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false }; private static final ARecordType META_TYPE = null; private static final GenerationFunction[] META_GEN_FUNCTION = null; @@ -78,7 +75,6 @@ public class LogMarkerTest { private static final int NUM_OF_RECORDS = 100000; private static final int SNAPSHOT_SIZE = 1000; private static final int DATASET_ID = 101; - private static final String SPILL_AREA = "target" + File.separator + "spill_area"; private static final String DATAVERSE_NAME = "TestDV"; private static final String DATASET_NAME = "TestDS"; private static final String DATA_TYPE_NAME = "DUMMY"; @@ -86,37 +82,20 @@ public class LogMarkerTest { @Before public void setUp() throws Exception { - System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME); System.out.println("SetUp: "); - File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir"); - FileUtils.deleteQuietly(f); - System.out.println("Dir " + f.getName() + " deleted"); - f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice"); - FileUtils.deleteQuietly(f); - System.out.println("Dir " + f.getName() + " deleted"); - f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA); - FileUtils.deleteQuietly(f); - System.out.println("Dir " + f.getName() + " deleted"); + TestHelper.deleteExistingInstanceFiles(); } @After public void tearDown() throws Exception { System.out.println("TearDown"); - File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir"); - FileUtils.deleteQuietly(f); - System.out.println("Dir " + f.getName() + " deleted"); - f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice"); - FileUtils.deleteQuietly(f); - System.out.println("Dir " + f.getName() + " deleted"); - f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA); - FileUtils.deleteQuietly(f); - System.out.println("Dir " + f.getName() + " deleted"); + TestHelper.deleteExistingInstanceFiles(); } @Test public void testInsertWithSnapshot() { try { - TestNodeController nc = new TestNodeController(); + TestNodeController nc = new TestNodeController(null, false); nc.init(); Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, @@ -125,7 +104,7 @@ public class LogMarkerTest { try { nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null); - IHyracksTaskContext ctx = nc.createTestContext(); + IHyracksTaskContext ctx = nc.createTestContext(true); nc.newJobId(); ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true); AsterixLSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34d75c41/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java new file mode 100644 index 0000000..e3932ca --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.logging; + +import java.io.File; +import java.util.Collections; + +import org.apache.asterix.app.bootstrap.TestNodeController; +import org.apache.asterix.app.data.gen.TupleGenerator; +import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; +import org.apache.asterix.common.config.AsterixTransactionProperties; +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.configuration.AsterixConfiguration; +import org.apache.asterix.common.configuration.Property; +import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable; +import org.apache.asterix.common.transactions.DatasetId; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.external.util.DataflowUtils; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.test.common.TestHelper; +import org.apache.asterix.transaction.management.service.logging.LogManager; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; +import org.apache.hyracks.util.StorageUtil; +import org.apache.hyracks.util.StorageUtil.StorageUnit; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CheckpointingTest { + + private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; + private static final String TEST_CONFIG_FILE_NAME = "asterix-test-configuration.xml"; + private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "target" + + File.separator + "config"; + private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME; + private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 }; + private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" }, + new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false); + private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC, + GenerationFunction.DETERMINISTIC }; + private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false }; + private static final ARecordType META_TYPE = null; + private static final GenerationFunction[] META_GEN_FUNCTION = null; + private static final boolean[] UNIQUE_META_FIELDS = null; + private static final int[] KEY_INDEXES = { 0 }; + private static final int[] KEY_INDICATORS = { 0 }; + private static final int DATASET_ID = 101; + private static final String DATAVERSE_NAME = "TestDV"; + private static final String DATASET_NAME = "TestDS"; + private static final String DATA_TYPE_NAME = "DUMMY"; + private static final String NODE_GROUP_NAME = "DEFAULT"; + private static final int TXN_LOG_PARTITION_SIZE = StorageUtil.getSizeInBytes(2, StorageUnit.MEGABYTE); + + @Before + public void setUp() throws Exception { + System.out.println("SetUp: "); + TestHelper.deleteExistingInstanceFiles(); + // Read default test configurations + AsterixConfiguration ac = TestHelper.getConfigurations(DEFAULT_TEST_CONFIG_FILE_NAME); + // Set log file size to 2MB + ac.getProperty().add(new Property(AsterixTransactionProperties.TXN_LOG_PARTITIONSIZE_KEY, + String.valueOf(TXN_LOG_PARTITION_SIZE), "")); + // Disable checkpointing by making checkpoint thread wait max wait time + ac.getProperty().add(new Property(AsterixTransactionProperties.TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY, + String.valueOf(Integer.MAX_VALUE), "")); + // Write test config file + TestHelper.writeConfigurations(ac, TEST_CONFIG_FILE_PATH); + } + + @After + public void tearDown() throws Exception { + System.out.println("TearDown"); + TestHelper.deleteExistingInstanceFiles(); + } + + @Test + public void testDeleteOldLogFiles() { + try { + TestNodeController nc = new TestNodeController(new File(TEST_CONFIG_FILE_PATH).getAbsolutePath(), false); + nc.init(); + Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, + NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, + Collections.emptyList(), null, null, null, false, null, false), + null, DatasetType.INTERNAL, DATASET_ID, 0); + try { + nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, + null); + IHyracksTaskContext ctx = nc.createTestContext(false); + nc.newJobId(); + ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true); + // Prepare insert operation + AsterixLSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, + RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null); + insertOp.open(); + TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, + RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + VSizeFrame frame = new VSizeFrame(ctx); + FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); + + LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager(); + // Number of log files after node startup should be one + int numberOfLogFiles = logManager.getLogFileIds().size(); + Assert.assertEquals(1, numberOfLogFiles); + + // Low-water mark LSN + long lowWaterMarkLSN = nc.getTransactionSubsystem().getRecoveryManager().getMinFirstLSN(); + // Low-water mark log file id + long initialLowWaterMarkFileId = logManager.getLogFileId(lowWaterMarkLSN); + // Initial Low-water mark should be in the only available log file + Assert.assertEquals(initialLowWaterMarkFileId, logManager.getLogFileIds().get(0).longValue()); + + // Insert records until a new log file is created + while (logManager.getLogFileIds().size() == 1) { + ITupleReference tuple = tupleGenerator.next(); + DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); + } + + // Check if the new low-water mark is still in the initial low-water mark log file + lowWaterMarkLSN = nc.getTransactionSubsystem().getRecoveryManager().getMinFirstLSN(); + long currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN); + + if (currentLowWaterMarkLogFileId == initialLowWaterMarkFileId) { + /* + * Make sure checkpoint will not delete the initial log file since + * the low-water mark is still in it (i.e. it is still required for + * recovery) + */ + int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size(); + nc.getTransactionSubsystem().getRecoveryManager().checkpoint(false, logManager.getAppendLSN()); + int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size(); + Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint); + + /* + * Insert records until the low-water mark is not in the initialLowWaterMarkFileId + * either because of the asynchronous flush caused by the previous checkpoint or a flush + * due to the dataset memory budget getting full. + */ + while (currentLowWaterMarkLogFileId == initialLowWaterMarkFileId) { + ITupleReference tuple = tupleGenerator.next(); + DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); + lowWaterMarkLSN = nc.getTransactionSubsystem().getRecoveryManager().getMinFirstLSN(); + currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN); + } + } + + /* + * At this point, the low-water mark is not in the initialLowWaterMarkFileId, so + * a checkpoint should delete it. + */ + nc.getTransactionSubsystem().getRecoveryManager().checkpoint(false, logManager.getAppendLSN()); + + // Validate initialLowWaterMarkFileId was deleted + for (Long fileId : logManager.getLogFileIds()) { + Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue()); + } + + if (tupleAppender.getTupleCount() > 0) { + tupleAppender.write(insertOp, true); + } + insertOp.close(); + nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true); + } finally { + nc.deInit(); + } + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34d75c41/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java index afc9e4f..0480887 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java @@ -18,13 +18,13 @@ */ package org.apache.asterix.common.config; +import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE; +import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE; + import java.util.Map; import org.apache.hyracks.util.StorageUtil; -import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE; -import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE; - public class AsterixTransactionProperties extends AbstractAsterixProperties { private static final String TXN_LOG_BUFFER_NUMPAGES_KEY = "txn.log.buffer.numpages"; @@ -33,13 +33,13 @@ public class AsterixTransactionProperties extends AbstractAsterixProperties { private static final String TXN_LOG_BUFFER_PAGESIZE_KEY = "txn.log.buffer.pagesize"; private static final int TXN_LOG_BUFFER_PAGESIZE_DEFAULT = StorageUtil.getSizeInBytes(128, KILOBYTE); - private static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize"; + public static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize"; private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = StorageUtil.getSizeInBytes(256L, MEGABYTE); private static final String TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY = "txn.log.checkpoint.lsnthreshold"; private static final int TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT = StorageUtil.getSizeInBytes(64, MEGABYTE); - private static final String TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY = "txn.log.checkpoint.pollfrequency"; + public static final String TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY = "txn.log.checkpoint.pollfrequency"; private static final int TXN_LOG_CHECKPOINT_POLLFREQUENCY_DEFAULT = 120; // 120s private static final String TXN_LOG_CHECKPOINT_HISTORY_KEY = "txn.log.checkpoint.history"; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34d75c41/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 7d61462..947ebc7 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -423,7 +423,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { } } - private List<Long> getLogFileIds() { + public List<Long> getLogFileIds() { File fileLogDir = new File(logDir); String[] logFileNames = null; List<Long> logFileIds = null;