Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 aad3ae2cb -> bc89bc66c refs/heads/trunk 09c94a667 -> dec76593f
Partially revert #9839 to remove reference loop Patch by Sam Tunnicliffe; reviewed by Benedict Elliot Smith for CASSANDRA-10543 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc89bc66 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc89bc66 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc89bc66 Branch: refs/heads/cassandra-3.0 Commit: bc89bc66cb762da2be61b92d56b48154d8bd3cbf Parents: aad3ae2 Author: Sam Tunnicliffe <s...@beobal.com> Authored: Fri Oct 16 17:39:07 2015 +0100 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Fri Oct 16 20:19:37 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 4 +++ .../compress/CompressedRandomAccessReader.java | 16 +++++++----- .../io/sstable/format/SSTableReader.java | 23 +++++++++++++---- .../cassandra/io/util/IChecksummedFile.java | 27 -------------------- .../cassandra/io/util/ICompressedFile.java | 2 +- .../apache/cassandra/io/util/SegmentedFile.java | 14 +--------- .../cassandra/schema/CompressionParams.java | 12 +++++++++ .../miscellaneous/CrcCheckChanceTest.java | 19 +++++++++++++- 9 files changed, 64 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc89bc66/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 77facc4..cb4c2d8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0-rc2 + * Remove circular references in SegmentedFile (CASSANDRA-10543) * Ensure validation of indexed values only occurs once per-partition (CASSANDRA-10536) * Fix handling of static columns for range tombstones in thrift (CASSANDRA-10174) * Support empty ColumnFilter for backward compatility on empty IN (CASSANDRA-10471) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc89bc66/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 4c9fc55..0b838bf 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2044,7 +2044,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { TableParams.builder().crcCheckChance(crcCheckChance).build().validate(); for (ColumnFamilyStore cfs : concatWithIndexes()) + { cfs.crcCheckChance.set(crcCheckChance); + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.LIVE)) + sstable.setCrcCheckChance(crcCheckChance); + } } catch (ConfigurationException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc89bc66/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index b2759e6..329d932 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -23,6 +23,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.zip.Checksum; import java.util.function.Supplier; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import org.apache.cassandra.io.FSReadError; @@ -46,14 +47,18 @@ public class CompressedRandomAccessReader extends RandomAccessReader // raw checksum bytes private ByteBuffer checksumBytes; - private final Supplier<Double> crcCheckChanceSupplier; + + @VisibleForTesting + public double getCrcCheckChance() + { + return metadata.parameters.getCrcCheckChance(); + } protected CompressedRandomAccessReader(Builder builder) { super(builder); this.metadata = builder.metadata; this.checksum = metadata.checksumType.newInstance(); - crcCheckChanceSupplier = builder.crcCheckChanceSupplier; if (regions == null) { @@ -124,7 +129,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader buffer.flip(); } - if (crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble()) + if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) { compressed.rewind(); metadata.checksumType.update( checksum, (compressed)); @@ -186,7 +191,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader buffer.flip(); } - if (crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble()) + if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) { compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); @@ -239,21 +244,18 @@ public class CompressedRandomAccessReader extends RandomAccessReader public final static class Builder extends RandomAccessReader.Builder { private final CompressionMetadata metadata; - private final Supplier<Double> crcCheckChanceSupplier; public Builder(ICompressedFile file) { super(file.channel()); this.metadata = applyMetadata(file.getMetadata()); this.regions = file.regions(); - this.crcCheckChanceSupplier = file.getCrcCheckChanceSupplier(); } public Builder(ChannelProxy channel, CompressionMetadata metadata) { super(channel); this.metadata = applyMetadata(metadata); - this.crcCheckChanceSupplier = (() -> 1.0); //100% crc_check_chance } private CompressionMetadata applyMetadata(CompressionMetadata metadata) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc89bc66/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 8d23597..afd0a1e 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -217,6 +217,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS private RestorableMeter readMeter; + private volatile double crcCheckChance; + /** * Calculate approximate key count. * If cardinality estimator is available on all given sstables, then this method use them to estimate @@ -657,10 +659,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS keyCache = CacheService.instance.keyCache; final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId); if (cfs != null) - { - ifile.setCrcCheckChanceSupplier(cfs::getCrcCheckChance); - dfile.setCrcCheckChanceSupplier(cfs::getCrcCheckChance); - } + setCrcCheckChance(cfs.getCrcCheckChance()); } public boolean isKeyCacheSetup() @@ -1644,7 +1643,21 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS @VisibleForTesting public double getCrcCheckChance() { - return dfile.getCrcCheckChanceSupplier().get(); + return crcCheckChance; + } + + /** + * Set the value of CRC check chance. The argument supplied is obtained + * from the the property of the owning CFS. Called when either the SSTR + * is initialized, or the CFS's property is updated via JMX + * @param crcCheckChance + */ + public void setCrcCheckChance(double crcCheckChance) + { + this.crcCheckChance = crcCheckChance; + if (compression) + ((CompressedSegmentedFile)dfile).metadata.parameters.setCrcCheckChance(crcCheckChance); + } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc89bc66/src/java/org/apache/cassandra/io/util/IChecksummedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/IChecksummedFile.java b/src/java/org/apache/cassandra/io/util/IChecksummedFile.java deleted file mode 100644 index fa15a5e..0000000 --- a/src/java/org/apache/cassandra/io/util/IChecksummedFile.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.io.util; - -import java.util.function.Supplier; - -public interface IChecksummedFile -{ - public Supplier<Double> getCrcCheckChanceSupplier(); - public void setCrcCheckChanceSupplier(Supplier<Double> crcCheckChanceSupplier); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc89bc66/src/java/org/apache/cassandra/io/util/ICompressedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java index c149fd1..43cef8c 100644 --- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java +++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java @@ -19,7 +19,7 @@ package org.apache.cassandra.io.util; import org.apache.cassandra.io.compress.CompressionMetadata; -public interface ICompressedFile extends IChecksummedFile +public interface ICompressedFile { ChannelProxy channel(); CompressionMetadata getMetadata(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc89bc66/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index c827255..ab2d291 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -49,7 +49,7 @@ import static org.apache.cassandra.utils.Throwables.maybeFail; * would need to be longer than 2GB, that segment will not be mmap'd, and a new RandomAccessFile will be created for * each access to that segment. */ -public abstract class SegmentedFile extends SharedCloseableImpl implements IChecksummedFile +public abstract class SegmentedFile extends SharedCloseableImpl { public final ChannelProxy channel; public final int bufferSize; @@ -58,8 +58,6 @@ public abstract class SegmentedFile extends SharedCloseableImpl implements IChec // This differs from length for compressed files (but we still need length for // SegmentIterator because offsets in the file are relative to the uncompressed size) public final long onDiskLength; - private Supplier<Double> crcCheckChanceSupplier = () -> 1.0; - /** * Use getBuilder to get a Builder to construct a SegmentedFile. @@ -137,16 +135,6 @@ public abstract class SegmentedFile extends SharedCloseableImpl implements IChec return reader; } - public Supplier<Double> getCrcCheckChanceSupplier() - { - return crcCheckChanceSupplier; - } - - public void setCrcCheckChanceSupplier(Supplier<Double> crcCheckChanceSupplier) - { - this.crcCheckChanceSupplier = crcCheckChanceSupplier; - } - public void dropPageCache(long before) { CLibrary.trySkipCache(channel.getFileDescriptor(), 0, before, path()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc89bc66/src/java/org/apache/cassandra/schema/CompressionParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java index 7f46718..cd1686f 100644 --- a/src/java/org/apache/cassandra/schema/CompressionParams.java +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -71,6 +71,8 @@ public final class CompressionParams private final Integer chunkLength; private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be used by the compressor + private volatile double crcCheckChance = 1.0; + public static CompressionParams fromMap(Map<String, String> opts) { Map<String, String> options = copyOptions(opts); @@ -455,6 +457,16 @@ public final class CompressionParams return String.valueOf(chunkLength() / 1024); } + public void setCrcCheckChance(double crcCheckChance) + { + this.crcCheckChance = crcCheckChance; + } + + public double getCrcCheckChance() + { + return crcCheckChance; + } + @Override public boolean equals(Object obj) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc89bc66/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java index 3a68e4a..d059f7d 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java @@ -30,6 +30,8 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.CompactionInterruptedException; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.io.compress.CompressedRandomAccessReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.FBUtilities; @@ -70,6 +72,7 @@ public class CrcCheckChanceTest extends CQLTester Assert.assertEquals(0.99, cfs.getCrcCheckChance()); Assert.assertEquals(0.99, cfs.getLiveSSTables().iterator().next().getCrcCheckChance()); + Assert.assertEquals(0.99, indexCfs.getCrcCheckChance()); Assert.assertEquals(0.99, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance()); @@ -145,8 +148,22 @@ public class CrcCheckChanceTest extends CQLTester Assert.assertEquals(0.03, cfs.getLiveSSTables().iterator().next().getCrcCheckChance()); Assert.assertEquals(0.03, indexCfs.getCrcCheckChance()); Assert.assertEquals(0.03, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance()); - } + // Also check that any open readers also use the updated value + // note: only compressed files currently perform crc checks, so only the dfile reader is relevant here + SSTableReader baseSSTable = cfs.getLiveSSTables().iterator().next(); + SSTableReader idxSSTable = indexCfs.getLiveSSTables().iterator().next(); + try (CompressedRandomAccessReader baseDataReader = (CompressedRandomAccessReader)baseSSTable.openDataReader(); + CompressedRandomAccessReader idxDataReader = (CompressedRandomAccessReader)idxSSTable.openDataReader()) + { + Assert.assertEquals(0.03, baseDataReader.getCrcCheckChance()); + Assert.assertEquals(0.03, idxDataReader.getCrcCheckChance()); + + cfs.setCrcCheckChance(0.31); + Assert.assertEquals(0.31, baseDataReader.getCrcCheckChance()); + Assert.assertEquals(0.31, idxDataReader.getCrcCheckChance()); + } + } @Test public void testDropDuringCompaction() throws Throwable