This is an automated email from the ASF dual-hosted git repository. snazy pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
View the commit online: https://github.com/apache/cassandra/commit/c2e11bd4224b2110abe6aa84c8882e85980e3491 The following commit(s) were added to refs/heads/trunk by this push: new c2e11bd Close channels on error c2e11bd is described below commit c2e11bd4224b2110abe6aa84c8882e85980e3491 Author: Ekaterina Dimitrova <ekaterina.dimitr...@datastax.com> AuthorDate: Fri Nov 8 15:15:20 2019 -0500 Close channels on error Patch by Ekaterina Dimitrova, reviewed by Robert Stupp for CASSANDRA-15407 --- CHANGES.txt | 1 + .../cassandra/hints/ChecksummedDataInput.java | 11 +++++- .../hints/CompressedChecksummedDataInput.java | 11 +++++- .../hints/EncryptedChecksummedDataInput.java | 11 +++++- .../org/apache/cassandra/hints/HintsWriter.java | 12 +++--- .../org/apache/cassandra/io/util/FileHandle.java | 9 +++++ .../org/apache/cassandra/utils/Throwables.java | 45 ++++++++++++++++++++++ 7 files changed, 91 insertions(+), 9 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index da57886..d44306c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ * Optimise native protocol ASCII string encoding (CASSANDRA-15410) * Make sure all exceptions are propagated in DebuggableThreadPoolExecutor (CASSANDRA-15332) * Make it possible to resize concurrent read / write thread pools at runtime (CASSANDRA-15277) + * Close channels on error (CASSANDRA-15407) Merged from 2.2: * In-JVM DTest: Set correct internode message version for upgrade test (CASSANDRA-15371) diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java index 6ebc830..30d18fa 100644 --- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java @@ -26,6 +26,7 @@ import com.google.common.base.Preconditions; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.*; +import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.NativeLibrary; import org.apache.cassandra.utils.memory.BufferPool; @@ -74,7 +75,15 @@ public class ChecksummedDataInput extends RebufferingInputStream @SuppressWarnings("resource") public static ChecksummedDataInput open(File file) { - return new ChecksummedDataInput(new ChannelProxy(file)); + ChannelProxy channel = new ChannelProxy(file); + try + { + return new ChecksummedDataInput(channel); + } + catch (Throwable t) + { + throw Throwables.cleaned(channel.close(t)); + } } public boolean isEOF() diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java index 4982a03..0381b00 100644 --- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java @@ -27,6 +27,7 @@ import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.Throwables; public final class CompressedChecksummedDataInput extends ChecksummedDataInput { @@ -160,7 +161,15 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput long position = input.getPosition(); input.close(); - return new CompressedChecksummedDataInput(new ChannelProxy(input.getPath()), compressor, position); + ChannelProxy channel = new ChannelProxy(input.getPath()); + try + { + return new CompressedChecksummedDataInput(channel, compressor, position); + } + catch (Throwable t) + { + throw Throwables.cleaned(channel.close(t)); + } } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java index a70a443..5edd8a8 100644 --- a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java @@ -28,6 +28,7 @@ import org.apache.cassandra.security.EncryptionUtils; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.utils.Throwables; public class EncryptedChecksummedDataInput extends ChecksummedDataInput { @@ -137,7 +138,15 @@ public class EncryptedChecksummedDataInput extends ChecksummedDataInput long position = input.getPosition(); input.close(); - return new EncryptedChecksummedDataInput(new ChannelProxy(input.getPath()), cipher, compressor, position); + ChannelProxy channel = new ChannelProxy(input.getPath()); + try + { + return new EncryptedChecksummedDataInput(channel, cipher, compressor, position); + } + catch (Throwable t) + { + throw Throwables.cleaned(channel.close(t)); + } } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java index 5997eb4..589802b 100644 --- a/src/java/org/apache/cassandra/hints/HintsWriter.java +++ b/src/java/org/apache/cassandra/hints/HintsWriter.java @@ -81,18 +81,18 @@ class HintsWriter implements AutoCloseable ByteBuffer descriptorBytes = dob.buffer(); updateChecksum(crc, descriptorBytes); channel.write(descriptorBytes); + + if (descriptor.isEncrypted()) + return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc); + if (descriptor.isCompressed()) + return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc); + return new HintsWriter(directory, descriptor, file, channel, fd, crc); } catch (Throwable e) { channel.close(); throw e; } - - if (descriptor.isEncrypted()) - return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc); - if (descriptor.isCompressed()) - return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc); - return new HintsWriter(directory, descriptor, file, channel, fd, crc); } HintsDescriptor descriptor() diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java b/src/java/org/apache/cassandra/io/util/FileHandle.java index a3afc2f..b705769 100644 --- a/src/java/org/apache/cassandra/io/util/FileHandle.java +++ b/src/java/org/apache/cassandra/io/util/FileHandle.java @@ -33,6 +33,7 @@ import org.apache.cassandra.utils.concurrent.RefCounted; import org.apache.cassandra.utils.concurrent.SharedCloseableImpl; import static org.apache.cassandra.utils.Throwables.maybeFail; +import org.apache.cassandra.utils.Throwables; /** * {@link FileHandle} provides access to a file for reading, including the ones written by various {@link SequentialWriter} @@ -341,9 +342,11 @@ public class FileHandle extends SharedCloseableImpl @SuppressWarnings("resource") public FileHandle complete(long overrideLength) { + boolean channelOpened = false; if (channel == null) { channel = new ChannelProxy(path); + channelOpened = true; } ChannelProxy channelCopy = channel.sharedCopy(); @@ -388,6 +391,12 @@ public class FileHandle extends SharedCloseableImpl catch (Throwable t) { channelCopy.close(); + if (channelOpened) + { + ChannelProxy c = channel; + channel = null; + throw Throwables.cleaned(c.close(t)); + } throw t; } } diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java index 5d6d96f..9c6da60 100644 --- a/src/java/org/apache/cassandra/utils/Throwables.java +++ b/src/java/org/apache/cassandra/utils/Throwables.java @@ -20,9 +20,12 @@ package org.apache.cassandra.utils; import java.io.File; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.Iterator; import java.util.Optional; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.function.Predicate; import java.util.stream.Stream; @@ -193,4 +196,46 @@ public final class Throwables } return Optional.empty(); } + + /** + * If the provided throwable is a "wrapping" exception (see below), return the cause of that throwable, otherwise + * return its argument untouched. + * <p> + * We call a "wrapping" exception in the context of that method an exception whose only purpose is to wrap another + * exception, and currently this method recognize only 2 exception as "wrapping" ones: {@link ExecutionException} + * and {@link CompletionException}. + */ + public static Throwable unwrapped(Throwable t) + { + Throwable unwrapped = t; + while (unwrapped instanceof CompletionException || + unwrapped instanceof ExecutionException || + unwrapped instanceof InvocationTargetException) + unwrapped = unwrapped.getCause(); + + // I don't think it make sense for those 2 exception classes to ever be used with null causes, but no point + // in failing here if this happen. We still wrap the original exception if that happen so we get a sign + // that the assumption of this method is wrong. + return unwrapped == null + ? new RuntimeException("Got wrapping exception not wrapping anything", t) + : unwrapped; + } + + /** + * If the provided exception is unchecked, return it directly, otherwise wrap it into a {@link RuntimeException} + * to make it unchecked. + */ + public static RuntimeException unchecked(Throwable t) + { + return t instanceof RuntimeException ? (RuntimeException)t : new RuntimeException(t); + } + + /** + * A shortcut for {@code unchecked(unwrapped(t))}. This is called "cleaned" because this basically removes the annoying + * cruft surrounding an exception :). + */ + public static RuntimeException cleaned(Throwable t) + { + return unchecked(unwrapped(t)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org