Revert AutoSavingCache stream factory to OutputStream Adds a "non-transactional" flag to SequentialWriter to convert its semantics to a plain OutputStream
patch by bdeggleston; reviewed by benedict Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3fc425d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3fc425d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3fc425d Branch: refs/heads/trunk Commit: a3fc425dff25c42a49af38e87aa33501d4224195 Parents: 7a85c8b Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Fri Aug 14 12:29:27 2015 -0700 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Fri Aug 21 09:55:42 2015 +0100 ---------------------------------------------------------------------- .../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++---------- .../cassandra/io/util/SequentialWriter.java | 12 +++++++- .../utils/concurrent/Transactional.java | 8 +++--- .../cassandra/io/util/DataOutputTest.java | 3 +- .../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++ 5 files changed, 54 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 05653ba..f0f4e8a 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -49,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K { public interface IStreamFactory { - public InputStream getInputStream(File dataPath, File crcPath) throws IOException; - public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException; + InputStream getInputStream(File dataPath, File crcPath) throws IOException; + OutputStream getOutputStream(File dataPath, File crcPath) throws FileNotFoundException; } private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class); @@ -71,9 +71,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K return ChecksummedRandomAccessReader.open(dataPath, crcPath); } - public SequentialWriter getOutputWriter(File dataPath, File crcPath) + public OutputStream getOutputStream(File dataPath, File crcPath) { - return SequentialWriter.open(dataPath, crcPath); + return SequentialWriter.open(dataPath, crcPath).finishOnClose(); } }; @@ -254,8 +254,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K long start = System.nanoTime(); - HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>(); - HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>(); + HashMap<UUID, DataOutputPlus> writers = new HashMap<>(); + HashMap<UUID, OutputStream> streams = new HashMap<>(); HashMap<UUID, Pair<File, File>> paths = new HashMap<>(); try @@ -267,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K if (!Schema.instance.hasCF(key.getCFId())) continue; // the table has been dropped. - DataOutputPlus writer = dataOutputs.get(cfId); + DataOutputPlus writer = writers.get(cfId); if (writer == null) { Pair<File, File> cacheFilePaths = tempCacheFiles(cfId); - SequentialWriter sequentialWriter; + OutputStream stream; try { - sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right); - writer = new WrappedDataOutputStreamPlus(sequentialWriter); + stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right); + writer = new WrappedDataOutputStreamPlus(stream); } catch (FileNotFoundException e) { throw new RuntimeException(e); } paths.put(cfId, cacheFilePaths); - sequentialWriters.put(cfId, sequentialWriter); - dataOutputs.put(cfId, writer); + streams.put(cfId, stream); + writers.put(cfId, writer); } try @@ -312,14 +312,13 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K // not thrown (by OHC) } - for (SequentialWriter writer : sequentialWriters.values()) + for (OutputStream writer : streams.values()) { - writer.finish(); FileUtils.closeQuietly(writer); } } - for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet()) + for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet()) { UUID cfId = entry.getKey(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 915133f..0c39469 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -72,6 +72,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne protected Runnable runPostFlush; private final TransactionalProxy txnProxy = txnProxy(); + private boolean finishOnClose; protected Descriptor descriptor; // due to lack of multiple-inheritance, we proxy our transactional implementation @@ -167,6 +168,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector); } + public SequentialWriter finishOnClose() + { + finishOnClose = true; + return this; + } + public void write(int value) throws ClosedChannelException { if (buffer == null) @@ -472,7 +479,10 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne @Override public final void close() { - txnProxy.close(); + if (finishOnClose) + txnProxy.finish(); + else + txnProxy.close(); } public final void finish() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/utils/concurrent/Transactional.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java index 5b0eb8e..85c3de5 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java @@ -18,10 +18,6 @@ */ package org.apache.cassandra.utils.concurrent; -import java.util.Set; - -import com.google.common.collect.ImmutableSet; - import static org.apache.cassandra.utils.Throwables.maybeFail; import static org.apache.cassandra.utils.Throwables.merge; @@ -49,6 +45,10 @@ import static org.apache.cassandra.utils.Throwables.merge; * of the system should be, and so simply logging the exception is likely best (since it may have been an issue * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown * should be checked and ruled out during commit preparation. + * + * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit() + * aren't made prior to calling close(), the semantics of its close() method differ significantly from + * most AutoCloseable implementations. */ public interface Transactional extends AutoCloseable { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/DataOutputTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java index 70993d3..bbdf4e1 100644 --- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java +++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java @@ -179,10 +179,9 @@ public class DataOutputTest { File file = FileUtils.createTempFile("dataoutput", "test"); final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP); - DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer); + DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer.finishOnClose()); DataInput canon = testWrite(write); write.flush(); - writer.finish(); write.close(); DataInputStream test = new DataInputStream(new FileInputStream(file)); testRead(test, canon); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java index ce0f918..fd38427 100644 --- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java @@ -18,6 +18,7 @@ */ package org.apache.cassandra.io.util; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -25,7 +26,9 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import com.google.common.io.Files; import org.junit.After; +import org.junit.Test; import junit.framework.Assert; @@ -116,4 +119,25 @@ public class SequentialWriterTest extends AbstractTransactionalTest } } + /** + * Tests that the output stream exposed by SequentialWriter behaves as expected + */ + @Test + public void outputStream() + { + File tempFile = new File(Files.createTempDir(), "test.txt"); + Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists()); + + try (DataOutputStream os = new DataOutputStream(SequentialWriter.open(tempFile).finishOnClose())) + { + os.writeUTF("123"); + } + catch (IOException e) + { + Assert.fail(); + } + + Assert.assertTrue("temp file should exist", tempFile.exists()); + } + }