http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java new file mode 100644 index 0000000..a655fd8 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java @@ -0,0 +1,823 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.*; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import org.junit.BeforeClass; +import org.junit.Test; + +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import junit.framework.Assert; +import org.apache.cassandra.MockSchema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.compaction.*; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.BufferedSegmentedFile; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.utils.AlwaysPresentFilter; +import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest; +import org.apache.cassandra.utils.concurrent.Transactional; + +public class LogTransactionTest extends AbstractTransactionalTest +{ + private static final String KEYSPACE = "TransactionLogsTest"; + + @BeforeClass + public static void setUp() + { + MockSchema.cleanup(); + } + + protected AbstractTransactionalTest.TestableTransaction newTest() throws Exception + { + LogTransaction.waitForDeletions(); + SSTableReader.resetTidying(); + return new TxnTest(); + } + + private static final class TxnTest extends TestableTransaction + { + private final static class Transaction extends Transactional.AbstractTransactional implements Transactional + { + final ColumnFamilyStore cfs; + final LogTransaction txnLogs; + final SSTableReader sstableOld; + final SSTableReader sstableNew; + final LogTransaction.SSTableTidier tidier; + + Transaction(ColumnFamilyStore cfs, LogTransaction txnLogs) throws IOException + { + this.cfs = cfs; + this.txnLogs = txnLogs; + this.sstableOld = sstable(cfs, 0, 128); + this.sstableNew = sstable(cfs, 1, 128); + + assertNotNull(txnLogs); + assertNotNull(txnLogs.getId()); + Assert.assertEquals(OperationType.COMPACTION, txnLogs.getType()); + + txnLogs.trackNew(sstableNew); + tidier = txnLogs.obsoleted(sstableOld); + assertNotNull(tidier); + } + + protected Throwable doCommit(Throwable accumulate) + { + sstableOld.markObsolete(tidier); + sstableOld.selfRef().release(); + LogTransaction.waitForDeletions(); + + Throwable ret = txnLogs.commit(accumulate); + + sstableNew.selfRef().release(); + return ret; + } + + protected Throwable doAbort(Throwable accumulate) + { + tidier.abort(); + LogTransaction.waitForDeletions(); + + Throwable ret = txnLogs.abort(accumulate); + + sstableNew.selfRef().release(); + sstableOld.selfRef().release(); + return ret; + } + + protected void doPrepare() + { + txnLogs.prepareToCommit(); + } + + void assertInProgress() throws Exception + { + assertFiles(txnLogs.getDataFolder(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(), + sstableOld.getAllFilePaths(), + Collections.singleton(txnLogs.getLogFile().file.getPath())))); + } + + void assertPrepared() throws Exception + { + } + + void assertAborted() throws Exception + { + assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths())); + } + + void assertCommitted() throws Exception + { + assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); + } + } + + final Transaction txn; + + private TxnTest() throws IOException + { + this(MockSchema.newCFS(KEYSPACE)); + } + + private TxnTest(ColumnFamilyStore cfs) throws IOException + { + this(cfs, new LogTransaction(OperationType.COMPACTION, cfs.metadata)); + } + + private TxnTest(ColumnFamilyStore cfs, LogTransaction txnLogs) throws IOException + { + this(new Transaction(cfs, txnLogs)); + } + + private TxnTest(Transaction txn) + { + super(txn); + this.txn = txn; + } + + protected void assertInProgress() throws Exception + { + txn.assertInProgress(); + } + + protected void assertPrepared() throws Exception + { + txn.assertPrepared(); + } + + protected void assertAborted() throws Exception + { + txn.assertAborted(); + } + + protected void assertCommitted() throws Exception + { + txn.assertCommitted(); + } + } + + @Test + public void testUntrack() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstableNew = sstable(cfs, 1, 128); + + // complete a transaction without keep the new files since they were untracked + LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(log); + + log.trackNew(sstableNew); + log.untrackNew(sstableNew); + + log.finish(); + + sstableNew.selfRef().release(); + Thread.sleep(1); + LogTransaction.waitForDeletions(); + + assertFiles(log.getDataFolder(), Collections.<String>emptySet()); + } + + @Test + public void testCommitSameDesc() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstableOld1 = sstable(cfs, 0, 128); + SSTableReader sstableOld2 = sstable(cfs, 0, 256); + SSTableReader sstableNew = sstable(cfs, 1, 128); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(log); + + log.trackNew(sstableNew); + + sstableOld1.setReplaced(); + + LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld2); + assertNotNull(tidier); + + log.finish(); + + sstableOld2.markObsolete(tidier); + + sstableOld1.selfRef().release(); + sstableOld2.selfRef().release(); + + assertFiles(log.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); + + sstableNew.selfRef().release(); + } + + @Test + public void testCommitOnlyNew() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable = sstable(cfs, 0, 128); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(log); + + log.trackNew(sstable); + log.finish(); + + assertFiles(log.getDataFolder(), new HashSet<>(sstable.getAllFilePaths())); + + sstable.selfRef().release(); + } + + @Test + public void testCommitOnlyOld() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable = sstable(cfs, 0, 128); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(log); + + LogTransaction.SSTableTidier tidier = log.obsoleted(sstable); + assertNotNull(tidier); + + log.finish(); + sstable.markObsolete(tidier); + sstable.selfRef().release(); + + assertFiles(log.getDataFolder(), new HashSet<>()); + } + + @Test + public void testAbortOnlyNew() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable = sstable(cfs, 0, 128); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(log); + + log.trackNew(sstable); + log.abort(); + + sstable.selfRef().release(); + + assertFiles(log.getDataFolder(), new HashSet<>()); + } + + @Test + public void testAbortOnlyOld() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable = sstable(cfs, 0, 128); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(log); + + LogTransaction.SSTableTidier tidier = log.obsoleted(sstable); + assertNotNull(tidier); + + tidier.abort(); + log.abort(); + + sstable.selfRef().release(); + + assertFiles(log.getDataFolder(), new HashSet<>(sstable.getAllFilePaths())); + } + + @Test + public void testRemoveUnfinishedLeftovers_abort() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstableOld = sstable(cfs, 0, 128); + SSTableReader sstableNew = sstable(cfs, 1, 128); + + // simulate tracking sstables with a failed transaction (new log file NOT deleted) + LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(log); + + log.trackNew(sstableNew); + LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld); + + Set<File> tmpFiles = sstableNew.getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()); + + sstableNew.selfRef().release(); + sstableOld.selfRef().release(); + + Assert.assertEquals(tmpFiles, LogAwareFileLister.getTemporaryFiles(sstableNew.descriptor.directory)); + + // normally called at startup + LogTransaction.removeUnfinishedLeftovers(cfs.metadata); + + // sstableOld should be only table left + Directories directories = new Directories(cfs.metadata); + Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); + assertEquals(1, sstables.size()); + + assertFiles(log.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths())); + + tidier.run(); + + // complete the transaction before releasing files + log.close(); + } + + @Test + public void testRemoveUnfinishedLeftovers_commit() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstableOld = sstable(cfs, 0, 128); + SSTableReader sstableNew = sstable(cfs, 1, 128); + + // simulate tracking sstables with a committed transaction (new log file deleted) + LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(log); + + log.trackNew(sstableNew); + LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld); + + //Fake a commit + log.getLogFile().commit(); + + Set<File> tmpFiles = sstableOld.getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()); + + sstableNew.selfRef().release(); + sstableOld.selfRef().release(); + + Assert.assertEquals(tmpFiles, LogAwareFileLister.getTemporaryFiles(sstableOld.descriptor.directory)); + + // normally called at startup + LogTransaction.removeUnfinishedLeftovers(cfs.metadata); + + // sstableNew should be only table left + Directories directories = new Directories(cfs.metadata); + Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); + assertEquals(1, sstables.size()); + + assertFiles(log.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); + + tidier.run(); + + // complete the transaction to avoid LEAK errors + assertNull(log.complete(null)); + } + + @Test + public void testGetTemporaryFiles() throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable1 = sstable(cfs, 0, 128); + + File dataFolder = sstable1.descriptor.directory; + + Set<File> tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder); + assertNotNull(tmpFiles); + assertEquals(0, tmpFiles.size()); + + try(LogTransaction log = new LogTransaction(OperationType.WRITE, cfs.metadata)) + { + Directories directories = new Directories(cfs.metadata); + + File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory()); + + SSTableReader sstable2 = sstable(cfs, 1, 128); + log.trackNew(sstable2); + + Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); + assertEquals(2, sstables.size()); + + // this should contain sstable1, sstable2 and the transaction log file + File[] afterSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory()); + + int numNewFiles = afterSecondSSTable.length - beforeSecondSSTable.length; + assertEquals(numNewFiles - 1, sstable2.getAllFilePaths().size()); // new files except for transaction log file + + tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder); + assertNotNull(tmpFiles); + assertEquals(numNewFiles - 1, tmpFiles.size()); + + File ssTable2DataFile = new File(sstable2.descriptor.filenameFor(Component.DATA)); + File ssTable2IndexFile = new File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX)); + + assertTrue(tmpFiles.contains(ssTable2DataFile)); + assertTrue(tmpFiles.contains(ssTable2IndexFile)); + + List<File> files = directories.sstableLister(Directories.OnTxnErr.THROW).listFiles(); + List<File> filesNoTmp = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles(); + assertNotNull(files); + assertNotNull(filesNoTmp); + + assertTrue(files.contains(ssTable2DataFile)); + assertTrue(files.contains(ssTable2IndexFile)); + + assertFalse(filesNoTmp.contains(ssTable2DataFile)); + assertFalse(filesNoTmp.contains(ssTable2IndexFile)); + + log.finish(); + + //Now it should be empty since the transaction has finished + tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder); + assertNotNull(tmpFiles); + assertEquals(0, tmpFiles.size()); + + filesNoTmp = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles(); + assertNotNull(filesNoTmp); + assertTrue(filesNoTmp.contains(ssTable2DataFile)); + assertTrue(filesNoTmp.contains(ssTable2IndexFile)); + + sstable1.selfRef().release(); + sstable2.selfRef().release(); + } + } + + @Test + public void testWrongChecksumLastLine() throws IOException + { + testCorruptRecord((t, s) -> + { // Fake a commit with invalid checksum + FileUtils.append(t.getLogFile().file, + String.format("commit:[%d,0,0][%d]", + System.currentTimeMillis(), + 12345678L)); + }, + true); + } + + @Test + public void testWrongChecksumSecondFromLastLine() throws IOException + { + testCorruptRecord((t, s) -> + { // Fake two lines with invalid checksum + FileUtils.append(t.getLogFile().file, + String.format("add:[ma-3-big,%d,4][%d]", + System.currentTimeMillis(), + 12345678L)); + + FileUtils.append(t.getLogFile().file, + String.format("commit:[%d,0,0][%d]", + System.currentTimeMillis(), + 12345678L)); + }, + false); + } + + @Test + public void testWrongChecksumLastLineMissingFile() throws IOException + { + testCorruptRecord((t, s) -> + { // Fake a commit with invalid checksum and also delete one of the old files + for (String filePath : s.getAllFilePaths()) + { + if (filePath.endsWith("Data.db")) + { + assertTrue(FileUtils.delete(filePath)); + t.getLogFile().sync(); + break; + } + } + + FileUtils.append(t.getLogFile().file, + String.format("commit:[%d,0,0][%d]", + System.currentTimeMillis(), + 12345678L)); + }, + false); + } + + @Test + public void testWrongChecksumLastLineWrongRecordFormat() throws IOException + { + testCorruptRecord((t, s) -> + { // Fake a commit with invalid checksum and a wrong record format (extra spaces) + FileUtils.append(t.getLogFile().file, + String.format("commit:[%d ,0 ,0 ][%d]", + System.currentTimeMillis(), + 12345678L)); + }, + true); + } + + @Test + public void testMissingChecksumLastLine() throws IOException + { + testCorruptRecord((t, s) -> + { + // Fake a commit without a checksum + FileUtils.append(t.getLogFile().file, + String.format("commit:[%d,0,0]", + System.currentTimeMillis())); + }, + true); + } + + @Test + public void testMissingChecksumSecondFromLastLine() throws IOException + { + testCorruptRecord((t, s) -> + { // Fake two lines without a checksum + FileUtils.append(t.getLogFile().file, + String.format("add:[ma-3-big,%d,4]", + System.currentTimeMillis())); + + FileUtils.append(t.getLogFile().file, + String.format("commit:[%d,0,0]", + System.currentTimeMillis())); + }, + false); + } + + private static void testCorruptRecord(BiConsumer<LogTransaction, SSTableReader> modifier, boolean isRecoverable) throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstableOld = sstable(cfs, 0, 128); + SSTableReader sstableNew = sstable(cfs, 1, 128); + + File dataFolder = sstableOld.descriptor.directory; + + // simulate tracking sstables with a committed transaction except the checksum will be wrong + LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(log); + + log.trackNew(sstableNew); + log.obsoleted(sstableOld); + + // Modify the transaction log or disk state for sstableOld + modifier.accept(log, sstableOld); + + String txnFilePath = log.getLogFile().file.getPath(); + + assertNull(log.complete(null)); + + sstableOld.selfRef().release(); + sstableNew.selfRef().release(); + + // The files on disk, for old files make sure to exclude the files that were deleted by the modifier + Set<String> newFiles = sstableNew.getAllFilePaths().stream().collect(Collectors.toSet()); + Set<String> oldFiles = sstableOld.getAllFilePaths().stream().filter(p -> new File(p).exists()).collect(Collectors.toSet()); + + //This should filter as in progress since the last record is corrupt + assertFiles(newFiles, LogAwareFileLister.getTemporaryFiles(dataFolder)); + assertFiles(oldFiles, LogAwareFileLister.getFinalFiles(dataFolder)); + + if (isRecoverable) + { // the corruption is recoverable but the commit record is unreadable so the transaction is still in progress + + //This should remove new files + LogTransaction.removeUnfinishedLeftovers(cfs.metadata); + + // make sure to exclude the old files that were deleted by the modifier + assertFiles(dataFolder.getPath(), oldFiles); + } + else + { // if an intermediate line was also modified, it should ignore the tx log file + + //This should not remove any files + LogTransaction.removeUnfinishedLeftovers(cfs.metadata); + + assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(newFiles, + oldFiles, + Collections.singleton(txnFilePath)))); + } + } + + @Test + public void testObsoletedDataFileUpdateTimeChanged() throws IOException + { + testObsoletedFilesChanged(sstable -> + { + // increase the modification time of the Data file + for (String filePath : sstable.getAllFilePaths()) + { + if (filePath.endsWith("Data.db")) + assertTrue(new File(filePath).setLastModified(System.currentTimeMillis() + 60000)); //one minute later + } + }); + } + + private static void testObsoletedFilesChanged(Consumer<SSTableReader> modifier) throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstableOld = sstable(cfs, 0, 128); + SSTableReader sstableNew = sstable(cfs, 1, 128); + + // simulate tracking sstables with a committed transaction except the checksum will be wrong + LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(log); + + log.trackNew(sstableNew); + /*TransactionLog.SSTableTidier tidier =*/ log.obsoleted(sstableOld); + + //modify the old sstable files + modifier.accept(sstableOld); + + //Fake a commit + log.getLogFile().commit(); + + //This should not remove the old files + LogTransaction.removeUnfinishedLeftovers(cfs.metadata); + + assertFiles(log.getDataFolder(), Sets.newHashSet(Iterables.concat( + sstableNew.getAllFilePaths(), + sstableOld.getAllFilePaths(), + Collections.singleton(log.getLogFile().file.getPath())))); + + sstableOld.selfRef().release(); + sstableNew.selfRef().release(); + + // complete the transaction to avoid LEAK errors + assertNull(log.complete(null)); + + assertFiles(log.getDataFolder(), Sets.newHashSet(Iterables.concat( + sstableNew.getAllFilePaths(), + sstableOld.getAllFilePaths(), + Collections.singleton(log.getLogFile().file.getPath())))); + } + + @Test + public void testGetTemporaryFilesSafeAfterObsoletion() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable = sstable(cfs, 0, 128); + File dataFolder = sstable.descriptor.directory; + + LogTransaction logs = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(logs); + + LogTransaction.SSTableTidier tidier = logs.obsoleted(sstable); + + logs.finish(); + + sstable.markObsolete(tidier); + sstable.selfRef().release(); + + // This should race with the asynchronous deletion of txn log files + // It doesn't matter what it returns but it should not throw because the txn + // was completed before deleting files (i.e. releasing sstables) + for (int i = 0; i < 200; i++) + LogAwareFileLister.getTemporaryFiles(dataFolder); + } + + @Test + public void testGetTemporaryFilesThrowsIfCompletingAfterObsoletion() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable = sstable(cfs, 0, 128); + File dataFolder = sstable.descriptor.directory; + + LogTransaction logs = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + assertNotNull(logs); + + LogTransaction.SSTableTidier tidier = logs.obsoleted(sstable); + + sstable.markObsolete(tidier); + sstable.selfRef().release(); + + LogTransaction.waitForDeletions(); + + try + { + // This should race with the asynchronous deletion of txn log files + // it should throw because we are violating the requirement that a transaction must + // finish before deleting files (i.e. releasing sstables) + LogAwareFileLister.getTemporaryFiles(dataFolder); + fail("Expected runtime exception"); + } + catch(RuntimeException e) + { + //pass + } + + logs.finish(); + } + + private static SSTableReader sstable(ColumnFamilyStore cfs, int generation, int size) throws IOException + { + Directories dir = new Directories(cfs.metadata); + Descriptor descriptor = new Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getTableName(), generation); + Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC); + for (Component component : components) + { + File file = new File(descriptor.filenameFor(component)); + if (!file.exists()) + assertTrue(file.createNewFile()); + try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) + { + raf.setLength(size); + } + } + + SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0); + SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0); + + SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList()); + StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator) + .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header) + .get(MetadataType.STATS); + SSTableReader reader = SSTableReader.internalOpen(descriptor, + components, + cfs.metadata, + dFile, + iFile, + MockSchema.indexSummary.sharedCopy(), + new AlwaysPresentFilter(), + 1L, + metadata, + SSTableReader.OpenReason.NORMAL, + header); + reader.first = reader.last = MockSchema.readerBounds(generation); + return reader; + } + + private static void assertFiles(String dirPath, Set<String> expectedFiles) + { + assertFiles(dirPath, expectedFiles, false); + } + + private static void assertFiles(String dirPath, Set<String> expectedFiles, boolean excludeNonExistingFiles) + { + LogTransaction.waitForDeletions(); + + File dir = new File(dirPath); + File[] files = dir.listFiles(); + if (files != null) + { + for (File file : files) + { + if (file.isDirectory()) + continue; + + String filePath = file.getPath(); + assertTrue(filePath, expectedFiles.contains(filePath)); + expectedFiles.remove(filePath); + } + } + + if (excludeNonExistingFiles) + { + for (String filePath : expectedFiles) + { + File file = new File(filePath); + if (!file.exists()) + expectedFiles.remove(filePath); + } + } + + assertTrue(expectedFiles.toString(), expectedFiles.isEmpty()); + } + + // Check either that a temporary file is expected to exist (in the existingFiles) or that + // it does not exist any longer (on Windows we need to check File.exists() because a list + // might return a file as existing even if it does not) + private static void assertFiles(Iterable<String> existingFiles, Set<File> temporaryFiles) + { + for (String filePath : existingFiles) + { + File file = new File(filePath); + assertTrue(filePath, temporaryFiles.contains(file)); + temporaryFiles.remove(file); + } + + for (File file : temporaryFiles) + { + if (!file.exists()) + temporaryFiles.remove(file); + } + + assertTrue(temporaryFiles.toString(), temporaryFiles.isEmpty()); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java index 9eff1b1..a7ad156 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java @@ -20,13 +20,11 @@ package org.apache.cassandra.db.lifecycle; import java.io.File; import java.io.IOException; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; import org.junit.BeforeClass; import org.junit.Test; @@ -36,7 +34,6 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; @@ -87,7 +84,7 @@ public class RealTransactionsTest extends SchemaLoader SSTableReader oldSSTable = getSSTable(cfs, 1); LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION); SSTableReader newSSTable = replaceSSTable(cfs, txn, false); - TransactionLog.waitForDeletions(); + LogTransaction.waitForDeletions(); assertFiles(txn.log().getDataFolder(), new HashSet<>(newSSTable.getAllFilePaths())); } @@ -102,7 +99,7 @@ public class RealTransactionsTest extends SchemaLoader LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION); replaceSSTable(cfs, txn, true); - TransactionLog.waitForDeletions(); + LogTransaction.waitForDeletions(); assertFiles(txn.log().getDataFolder(), new HashSet<>(oldSSTable.getAllFilePaths())); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java index 0e36bb9..7b9b19c 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@ -30,7 +30,6 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import org.junit.BeforeClass; import org.junit.Test; @@ -189,9 +188,9 @@ public class TrackerTest public void testDropSSTables() { testDropSSTables(false); - TransactionLog.waitForDeletions(); + LogTransaction.waitForDeletions(); testDropSSTables(true); - TransactionLog.waitForDeletions(); + LogTransaction.waitForDeletions(); } private void testDropSSTables(boolean invalidate) @@ -214,7 +213,7 @@ public class TrackerTest else { tracker.dropSSTables(); - TransactionLog.waitForDeletions(); + LogTransaction.waitForDeletions(); } Assert.assertEquals(9, cfs.metric.totalDiskSpaceUsed.getCount()); Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogTest.java deleted file mode 100644 index 405d975..0000000 --- a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogTest.java +++ /dev/null @@ -1,812 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.lifecycle; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.*; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import org.junit.BeforeClass; -import org.junit.Test; - -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.fail; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import junit.framework.Assert; -import org.apache.cassandra.MockSchema; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.compaction.*; -import org.apache.cassandra.io.sstable.*; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.io.sstable.metadata.MetadataType; -import org.apache.cassandra.io.sstable.metadata.StatsMetadata; -import org.apache.cassandra.io.util.BufferedSegmentedFile; -import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.io.util.SegmentedFile; -import org.apache.cassandra.utils.AlwaysPresentFilter; -import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest; -import org.apache.cassandra.utils.concurrent.Transactional; - -public class TransactionLogTest extends AbstractTransactionalTest -{ - private static final String KEYSPACE = "TransactionLogsTest"; - - @BeforeClass - public static void setUp() - { - MockSchema.cleanup(); - } - - protected AbstractTransactionalTest.TestableTransaction newTest() throws Exception - { - TransactionLog.waitForDeletions(); - SSTableReader.resetTidying(); - return new TxnTest(); - } - - private static final class TxnTest extends TestableTransaction - { - private final static class Transaction extends Transactional.AbstractTransactional implements Transactional - { - final ColumnFamilyStore cfs; - final TransactionLog txnLogs; - final SSTableReader sstableOld; - final SSTableReader sstableNew; - final TransactionLog.SSTableTidier tidier; - - Transaction(ColumnFamilyStore cfs, TransactionLog txnLogs) throws IOException - { - this.cfs = cfs; - this.txnLogs = txnLogs; - this.sstableOld = sstable(cfs, 0, 128); - this.sstableNew = sstable(cfs, 1, 128); - - assertNotNull(txnLogs); - assertNotNull(txnLogs.getId()); - Assert.assertEquals(OperationType.COMPACTION, txnLogs.getType()); - - txnLogs.trackNew(sstableNew); - tidier = txnLogs.obsoleted(sstableOld); - assertNotNull(tidier); - } - - protected Throwable doCommit(Throwable accumulate) - { - sstableOld.markObsolete(tidier); - sstableOld.selfRef().release(); - TransactionLog.waitForDeletions(); - - Throwable ret = txnLogs.commit(accumulate); - - sstableNew.selfRef().release(); - return ret; - } - - protected Throwable doAbort(Throwable accumulate) - { - tidier.abort(); - TransactionLog.waitForDeletions(); - - Throwable ret = txnLogs.abort(accumulate); - - sstableNew.selfRef().release(); - sstableOld.selfRef().release(); - return ret; - } - - protected void doPrepare() - { - txnLogs.prepareToCommit(); - } - - void assertInProgress() throws Exception - { - assertFiles(txnLogs.getDataFolder(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(), - sstableOld.getAllFilePaths(), - Collections.singleton(txnLogs.getData().getLogFile().file.getPath())))); - } - - void assertPrepared() throws Exception - { - } - - void assertAborted() throws Exception - { - assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths())); - } - - void assertCommitted() throws Exception - { - assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); - } - } - - final Transaction txn; - - private TxnTest() throws IOException - { - this(MockSchema.newCFS(KEYSPACE)); - } - - private TxnTest(ColumnFamilyStore cfs) throws IOException - { - this(cfs, new TransactionLog(OperationType.COMPACTION, cfs.metadata)); - } - - private TxnTest(ColumnFamilyStore cfs, TransactionLog txnLogs) throws IOException - { - this(new Transaction(cfs, txnLogs)); - } - - private TxnTest(Transaction txn) - { - super(txn); - this.txn = txn; - } - - protected void assertInProgress() throws Exception - { - txn.assertInProgress(); - } - - protected void assertPrepared() throws Exception - { - txn.assertPrepared(); - } - - protected void assertAborted() throws Exception - { - txn.assertAborted(); - } - - protected void assertCommitted() throws Exception - { - txn.assertCommitted(); - } - } - - @Test - public void testUntrack() throws Throwable - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableNew = sstable(cfs, 1, 128); - - // complete a transaction without keep the new files since they were untracked - TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLog); - - transactionLog.trackNew(sstableNew); - transactionLog.untrackNew(sstableNew); - - transactionLog.finish(); - - sstableNew.selfRef().release(); - Thread.sleep(1); - TransactionLog.waitForDeletions(); - - assertFiles(transactionLog.getDataFolder(), Collections.<String>emptySet()); - } - - @Test - public void testCommitSameDesc() throws Throwable - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableOld1 = sstable(cfs, 0, 128); - SSTableReader sstableOld2 = sstable(cfs, 0, 256); - SSTableReader sstableNew = sstable(cfs, 1, 128); - - TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLog); - - transactionLog.trackNew(sstableNew); - - sstableOld1.setReplaced(); - - TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstableOld2); - assertNotNull(tidier); - - transactionLog.finish(); - - sstableOld2.markObsolete(tidier); - - sstableOld1.selfRef().release(); - sstableOld2.selfRef().release(); - - assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); - - sstableNew.selfRef().release(); - } - - @Test - public void testCommitOnlyNew() throws Throwable - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); - - TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLog); - - transactionLog.trackNew(sstable); - transactionLog.finish(); - - assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstable.getAllFilePaths())); - - sstable.selfRef().release(); - } - - @Test - public void testCommitOnlyOld() throws Throwable - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); - - TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLog); - - TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstable); - assertNotNull(tidier); - - transactionLog.finish(); - sstable.markObsolete(tidier); - sstable.selfRef().release(); - - assertFiles(transactionLog.getDataFolder(), new HashSet<>()); - } - - @Test - public void testAbortOnlyNew() throws Throwable - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); - - TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLog); - - transactionLog.trackNew(sstable); - transactionLog.abort(); - - sstable.selfRef().release(); - - assertFiles(transactionLog.getDataFolder(), new HashSet<>()); - } - - @Test - public void testAbortOnlyOld() throws Throwable - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); - - TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLog); - - TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstable); - assertNotNull(tidier); - - tidier.abort(); - transactionLog.abort(); - - sstable.selfRef().release(); - - assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstable.getAllFilePaths())); - } - - @Test - public void testRemoveUnfinishedLeftovers_abort() throws Throwable - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableOld = sstable(cfs, 0, 128); - SSTableReader sstableNew = sstable(cfs, 1, 128); - - // simulate tracking sstables with a failed transaction (new log file NOT deleted) - TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLog); - - transactionLog.trackNew(sstableNew); - TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstableOld); - - Set<File> tmpFiles = Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths().stream().map(File::new).collect(Collectors.toList()), - Collections.singleton(transactionLog.getData().getLogFile().file))); - - sstableNew.selfRef().release(); - sstableOld.selfRef().release(); - - Assert.assertEquals(tmpFiles, TransactionLog.getTemporaryFiles(cfs.metadata, sstableNew.descriptor.directory)); - - // normally called at startup - TransactionLog.removeUnfinishedLeftovers(cfs.metadata); - - // sstableOld should be only table left - Directories directories = new Directories(cfs.metadata); - Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); - assertEquals(1, sstables.size()); - - assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths())); - - tidier.run(); - - // complete the transaction to avoid LEAK errors - transactionLog.close(); - } - - @Test - public void testRemoveUnfinishedLeftovers_commit() throws Throwable - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableOld = sstable(cfs, 0, 128); - SSTableReader sstableNew = sstable(cfs, 1, 128); - - // simulate tracking sstables with a committed transaction (new log file deleted) - TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLog); - - transactionLog.trackNew(sstableNew); - TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstableOld); - - //Fake a commit - transactionLog.getData().getLogFile().commit(); - - Set<File> tmpFiles = Sets.newHashSet(Iterables.concat(sstableOld.getAllFilePaths().stream().map(p -> new File(p)).collect(Collectors.toList()), - Collections.singleton(transactionLog.getData().getLogFile().file))); - - sstableNew.selfRef().release(); - sstableOld.selfRef().release(); - - Assert.assertEquals(tmpFiles, TransactionLog.getTemporaryFiles(cfs.metadata, sstableOld.descriptor.directory)); - - // normally called at startup - TransactionLog.removeUnfinishedLeftovers(cfs.metadata); - - // sstableNew should be only table left - Directories directories = new Directories(cfs.metadata); - Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); - assertEquals(1, sstables.size()); - - assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); - - tidier.run(); - - // complete the transaction to avoid LEAK errors - assertNull(transactionLog.complete(null)); - } - - @Test - public void testGetTemporaryFiles() throws IOException - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable1 = sstable(cfs, 0, 128); - - File dataFolder = sstable1.descriptor.directory; - - Set<File> tmpFiles = TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder); - assertNotNull(tmpFiles); - assertEquals(0, tmpFiles.size()); - - TransactionLog transactionLog = new TransactionLog(OperationType.WRITE, cfs.metadata); - Directories directories = new Directories(cfs.metadata); - - File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory()); - - SSTableReader sstable2 = sstable(cfs, 1, 128); - transactionLog.trackNew(sstable2); - - Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); - assertEquals(2, sstables.size()); - - // this should contain sstable1, sstable2 and the transaction log file - File[] afterSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory()); - - int numNewFiles = afterSecondSSTable.length - beforeSecondSSTable.length; - assertEquals(numNewFiles - 1, sstable2.getAllFilePaths().size()); // new files except for transaction log file - - tmpFiles = TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder); - assertNotNull(tmpFiles); - assertEquals(numNewFiles, tmpFiles.size()); - - File ssTable2DataFile = new File(sstable2.descriptor.filenameFor(Component.DATA)); - File ssTable2IndexFile = new File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX)); - - assertTrue(tmpFiles.contains(ssTable2DataFile)); - assertTrue(tmpFiles.contains(ssTable2IndexFile)); - - List<File> files = directories.sstableLister(Directories.OnTxnErr.THROW).listFiles(); - List<File> filesNoTmp = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles(); - assertNotNull(files); - assertNotNull(filesNoTmp); - - assertTrue(files.contains(ssTable2DataFile)); - assertTrue(files.contains(ssTable2IndexFile)); - - assertFalse(filesNoTmp.contains(ssTable2DataFile)); - assertFalse(filesNoTmp.contains(ssTable2IndexFile)); - - transactionLog.finish(); - - //Now it should be empty since the transaction has finished - tmpFiles = TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder); - assertNotNull(tmpFiles); - assertEquals(0, tmpFiles.size()); - - filesNoTmp = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles(); - assertNotNull(filesNoTmp); - assertTrue(filesNoTmp.contains(ssTable2DataFile)); - assertTrue(filesNoTmp.contains(ssTable2IndexFile)); - - sstable1.selfRef().release(); - sstable2.selfRef().release(); - } - - @Test - public void testWrongChecksumLastLine() throws IOException - { - testCorruptRecord((t, s) -> - { // Fake a commit with invalid checksum - FileUtils.append(t.getData().getLogFile().file, - String.format("commit:[%d,0,0][%d]", - System.currentTimeMillis(), - 12345678L)); - }, - true); - } - - @Test - public void testWrongChecksumSecondFromLastLine() throws IOException - { - testCorruptRecord((t, s) -> - { // Fake two lines with invalid checksum - FileUtils.append(t.getData().getLogFile().file, - String.format("add:[ma-3-big,%d,4][%d]", - System.currentTimeMillis(), - 12345678L)); - - FileUtils.append(t.getData().getLogFile().file, - String.format("commit:[%d,0,0][%d]", - System.currentTimeMillis(), - 12345678L)); - }, - false); - } - - @Test - public void testWrongChecksumLastLineMissingFile() throws IOException - { - testCorruptRecord((t, s) -> - { // Fake a commit with invalid checksum and also delete one of the old files - for (String filePath : s.getAllFilePaths()) - { - if (filePath.endsWith("Data.db")) - { - FileUtils.delete(filePath); - break; - } - } - - FileUtils.append(t.getData().getLogFile().file, - String.format("commit:[%d,0,0][%d]", - System.currentTimeMillis(), - 12345678L)); - }, - false); - } - - @Test - public void testWrongChecksumLastLineWrongRecordFormat() throws IOException - { - testCorruptRecord((t, s) -> - { // Fake a commit with invalid checksum and a wrong record format (extra spaces) - FileUtils.append(t.getData().getLogFile().file, - String.format("commit:[%d ,0 ,0 ][%d]", - System.currentTimeMillis(), - 12345678L)); - }, - true); - } - - @Test - public void testMissingChecksumLastLine() throws IOException - { - testCorruptRecord((t, s) -> - { - // Fake a commit without a checksum - FileUtils.append(t.getData().getLogFile().file, - String.format("commit:[%d,0,0]", - System.currentTimeMillis())); - }, - true); - } - - @Test - public void testMissingChecksumSecondFromLastLine() throws IOException - { - testCorruptRecord((t, s) -> - { // Fake two lines without a checksum - FileUtils.append(t.getData().getLogFile().file, - String.format("add:[ma-3-big,%d,4]", - System.currentTimeMillis())); - - FileUtils.append(t.getData().getLogFile().file, - String.format("commit:[%d,0,0]", - System.currentTimeMillis())); - }, - false); - } - - private void testCorruptRecord(BiConsumer<TransactionLog, SSTableReader> modifier, boolean isRecoverable) throws IOException - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableOld = sstable(cfs, 0, 128); - SSTableReader sstableNew = sstable(cfs, 1, 128); - - File dataFolder = sstableOld.descriptor.directory; - - // simulate tracking sstables with a committed transaction except the checksum will be wrong - TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLog); - - transactionLog.trackNew(sstableNew); - transactionLog.obsoleted(sstableOld); - - //Modify the transaction log in some way - modifier.accept(transactionLog, sstableOld); - - String txnFilePath = transactionLog.getData().getLogFile().file.getPath(); - - assertNull(transactionLog.complete(null)); - - sstableOld.selfRef().release(); - sstableNew.selfRef().release(); - - if (isRecoverable) - { // the corruption is recoverable, we assume there is a commit record - - //This should return the old files and the tx log - assertFiles(Iterables.concat(sstableOld.getAllFilePaths(), Collections.singleton(txnFilePath)), - TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder)); - - //This should remove old files - TransactionLog.removeUnfinishedLeftovers(cfs.metadata); - - assertFiles(dataFolder.getPath(), Sets.newHashSet(sstableNew.getAllFilePaths())); - } - else - { // if an intermediate line was modified, we cannot tell, - // it should just throw and handle the exception with a log message - - //This should not return any files - assertEquals(Collections.emptyList(), new TransactionLog.FileLister(dataFolder.toPath(), - (file, type) -> type != Directories.FileType.FINAL, - Directories.OnTxnErr.IGNORE).list()); - - try - { - //This should throw a RuntimeException - new TransactionLog.FileLister(dataFolder.toPath(), - (file, type) -> type != Directories.FileType.FINAL, - Directories.OnTxnErr.THROW).list(); - fail("Expected exception"); - } - catch (RuntimeException ex) - { - // pass - ex.printStackTrace(); - } - - //This should not remove any files - TransactionLog.removeUnfinishedLeftovers(cfs.metadata); - - assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(), - sstableOld.getAllFilePaths(), - Collections.singleton(txnFilePath))), - true); - } - } - - @Test - public void testObsoletedDataFileUpdateTimeChanged() throws IOException - { - testObsoletedFilesChanged(sstable -> - { - // increase the modification time of the Data file - for (String filePath : sstable.getAllFilePaths()) - { - if (filePath.endsWith("Data.db")) - assertTrue(new File(filePath).setLastModified(System.currentTimeMillis() + 60000)); //one minute later - } - }); - } - - private static void testObsoletedFilesChanged(Consumer<SSTableReader> modifier) throws IOException - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableOld = sstable(cfs, 0, 128); - SSTableReader sstableNew = sstable(cfs, 1, 128); - - // simulate tracking sstables with a committed transaction except the checksum will be wrong - TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLog); - - transactionLog.trackNew(sstableNew); - /*TransactionLog.SSTableTidier tidier =*/ transactionLog.obsoleted(sstableOld); - - //modify the old sstable files - modifier.accept(sstableOld); - - //Fake a commit - transactionLog.getData().getLogFile().commit(); - - //This should not remove the old files - TransactionLog.removeUnfinishedLeftovers(cfs.metadata); - - assertFiles(transactionLog.getDataFolder(), Sets.newHashSet(Iterables.concat( - sstableNew.getAllFilePaths(), - sstableOld.getAllFilePaths(), - Collections.singleton(transactionLog.getData().getLogFile().file.getPath())))); - - sstableOld.selfRef().release(); - sstableNew.selfRef().release(); - - // complete the transaction to avoid LEAK errors - assertNull(transactionLog.complete(null)); - - assertFiles(transactionLog.getDataFolder(), Sets.newHashSet(Iterables.concat( - sstableNew.getAllFilePaths(), - sstableOld.getAllFilePaths(), - Collections.singleton(transactionLog.getData().getLogFile().file.getPath())))); - } - - @Test - public void testGetTemporaryFilesSafeAfterObsoletion_1() throws Throwable - { - testGetTemporaryFilesSafeAfterObsoletion(true); - } - - @Test - public void testGetTemporaryFilesSafeAfterObsoletion_2() throws Throwable - { - testGetTemporaryFilesSafeAfterObsoletion(false); - } - - private void testGetTemporaryFilesSafeAfterObsoletion(boolean finishBefore) throws Throwable - { - ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); - File dataFolder = sstable.descriptor.directory; - - TransactionLog transactionLogs = new TransactionLog(OperationType.COMPACTION, cfs.metadata); - assertNotNull(transactionLogs); - - TransactionLog.SSTableTidier tidier = transactionLogs.obsoleted(sstable); - - if (finishBefore) - transactionLogs.finish(); - - sstable.markObsolete(tidier); - sstable.selfRef().release(); - - for (int i = 0; i < 100; i++) - { - // This should race with the asynchronous deletion of txn log files - // It doesn't matter what it returns but it should not throw - TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder); - } - - if (!finishBefore) - transactionLogs.finish(); - } - - private static SSTableReader sstable(ColumnFamilyStore cfs, int generation, int size) throws IOException - { - Directories dir = new Directories(cfs.metadata); - Descriptor descriptor = new Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getTableName(), generation); - Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC); - for (Component component : components) - { - File file = new File(descriptor.filenameFor(component)); - if (!file.exists()) - assertTrue(file.createNewFile()); - try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) - { - raf.setLength(size); - } - } - - SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0); - SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0); - - SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList()); - StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator) - .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header) - .get(MetadataType.STATS); - SSTableReader reader = SSTableReader.internalOpen(descriptor, - components, - cfs.metadata, - dFile, - iFile, - MockSchema.indexSummary.sharedCopy(), - new AlwaysPresentFilter(), - 1L, - metadata, - SSTableReader.OpenReason.NORMAL, - header); - reader.first = reader.last = MockSchema.readerBounds(generation); - return reader; - } - - private static void assertFiles(String dirPath, Set<String> expectedFiles) - { - assertFiles(dirPath, expectedFiles, false); - } - - private static void assertFiles(String dirPath, Set<String> expectedFiles, boolean excludeNonExistingFiles) - { - TransactionLog.waitForDeletions(); - - File dir = new File(dirPath); - File[] files = dir.listFiles(); - if (files != null) - { - for (File file : files) - { - if (file.isDirectory()) - continue; - - String filePath = file.getPath(); - assertTrue(filePath, expectedFiles.contains(filePath)); - expectedFiles.remove(filePath); - } - } - - if (excludeNonExistingFiles) - { - for (String filePath : expectedFiles) - { - File file = new File(filePath); - if (!file.exists()) - expectedFiles.remove(filePath); - } - } - - assertTrue(expectedFiles.toString(), expectedFiles.isEmpty()); - } - - private static void assertFiles(Iterable<String> filePaths, Set<File> expectedFiles) - { - for (String filePath : filePaths) - { - File file = new File(filePath); - assertTrue(filePath, expectedFiles.contains(file)); - expectedFiles.remove(file); - } - - assertTrue(expectedFiles.isEmpty()); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 1c61f51..942c7f9 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -44,7 +44,6 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.rows.EncodingStats; -import org.apache.cassandra.db.lifecycle.TransactionLog; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.compaction.CompactionController; @@ -109,7 +108,7 @@ public class SSTableRewriterTest extends SchemaLoader Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); store.truncateBlocking(); - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); } @Test @@ -145,7 +144,7 @@ public class SSTableRewriterTest extends SchemaLoader } writer.finish(); } - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); validateCFS(cfs); int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list()); assertEquals(1, filecounts); @@ -177,7 +176,7 @@ public class SSTableRewriterTest extends SchemaLoader } writer.finish(); } - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); validateCFS(cfs); int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list()); assertEquals(1, filecounts); @@ -232,7 +231,7 @@ public class SSTableRewriterTest extends SchemaLoader assertTrue(checked); writer.finish(); } - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); validateCFS(cfs); int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list()); assertEquals(1, filecounts); @@ -277,12 +276,12 @@ public class SSTableRewriterTest extends SchemaLoader // open till .abort() is called (via the builder) if (!FBUtilities.isWindows()) { - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); assertFileCounts(dir.list()); } writer.abort(); txn.abort(); - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); int datafiles = assertFileCounts(dir.list()); assertEquals(datafiles, 0); validateCFS(cfs); @@ -328,7 +327,7 @@ public class SSTableRewriterTest extends SchemaLoader sstables = rewriter.finish(); } - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); long sum = 0; for (SSTableReader x : cfs.getLiveSSTables()) @@ -337,7 +336,7 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(startStorageMetricsLoad - sBytesOnDisk + sum, StorageMetrics.load.getCount()); assertEquals(files, sstables.size()); assertEquals(files, cfs.getLiveSSTables().size()); - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); // tmplink and tmp files should be gone: assertEquals(sum, cfs.metric.totalDiskSpaceUsed.getCount()); @@ -382,7 +381,7 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(files, sstables.size()); assertEquals(files, cfs.getLiveSSTables().size()); - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); assertFileCounts(s.descriptor.directory.list()); validateCFS(cfs); @@ -519,7 +518,7 @@ public class SSTableRewriterTest extends SchemaLoader test.run(scanner, controller, s, cfs, rewriter, txn); } - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount()); assertEquals(1, cfs.getLiveSSTables().size()); @@ -567,7 +566,7 @@ public class SSTableRewriterTest extends SchemaLoader } } - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); assertEquals(files - 1, cfs.getLiveSSTables().size()); // we never wrote anything to the last file assertFileCounts(s.descriptor.directory.list()); @@ -609,7 +608,7 @@ public class SSTableRewriterTest extends SchemaLoader sstables = rewriter.finish(); } - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); assertFileCounts(s.descriptor.directory.list()); validateCFS(cfs); } @@ -650,7 +649,7 @@ public class SSTableRewriterTest extends SchemaLoader } assertEquals(files, sstables.size()); assertEquals(files, cfs.getLiveSSTables().size()); - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); assertFileCounts(s.descriptor.directory.list()); validateCFS(cfs); @@ -670,7 +669,7 @@ public class SSTableRewriterTest extends SchemaLoader splitter.split(); assertFileCounts(s.descriptor.directory.list()); - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); for (File f : s.descriptor.directory.listFiles()) { @@ -746,7 +745,7 @@ public class SSTableRewriterTest extends SchemaLoader s.selfRef().release(); } - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); int filecount = assertFileCounts(s.descriptor.directory.list()); assertEquals(filecount, 1); @@ -825,7 +824,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.finish(); } validateKeys(keyspace); - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); validateCFS(cfs); truncate(cfs); } @@ -923,7 +922,7 @@ public class SSTableRewriterTest extends SchemaLoader public static void truncate(ColumnFamilyStore cfs) { cfs.truncateBlocking(); - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS); assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/test/unit/org/apache/cassandra/schema/DefsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java index 5eed80f..ee73b2b 100644 --- a/test/unit/org/apache/cassandra/schema/DefsTest.java +++ b/test/unit/org/apache/cassandra/schema/DefsTest.java @@ -40,7 +40,7 @@ import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.lifecycle.TransactionLog; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.exceptions.ConfigurationException; @@ -546,7 +546,7 @@ public class DefsTest // check assertTrue(cfs.indexManager.listIndexes().isEmpty()); - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); assertFalse(new File(desc.filenameFor(Component.DATA)).exists()); }