Add uncompressed chunk cache for RandomAccessReader

patch by Branimir Lambov; reviewed by Pavel Yaskevich for CASSANDRA-5863


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/30bb255e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/30bb255e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/30bb255e

Branch: refs/heads/trunk
Commit: 30bb255ec9fb36ace2aab51474bd3bfb9bbd3bed
Parents: 16e3507
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Wed Mar 2 11:15:44 2016 +0200
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Thu Apr 28 22:14:42 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  11 +-
 lib/caffeine-2.2.6.jar                          | Bin 0 -> 947685 bytes
 lib/licenses/caffeine-2.2.6.txt                 | 202 ++++++++++
 .../org/apache/cassandra/cache/CacheSize.java   |  14 +
 .../org/apache/cassandra/cache/ChunkCache.java  | 301 +++++++++++++++
 src/java/org/apache/cassandra/cache/ICache.java |  10 +-
 .../cassandra/db/compaction/Scrubber.java       |   2 +-
 .../cassandra/hints/ChecksummedDataInput.java   | 104 +++--
 .../hints/CompressedChecksummedDataInput.java   |  44 +--
 .../CompressedChecksummedDataInputBuilder.java  |  36 --
 .../hints/EncryptedChecksummedDataInput.java    |  51 +--
 .../org/apache/cassandra/hints/HintsReader.java |  10 +-
 .../compress/CompressedRandomAccessReader.java  | 286 --------------
 .../io/sstable/format/SSTableReader.java        |   2 +-
 .../io/sstable/format/big/BigTableWriter.java   |  13 +
 .../io/util/AbstractReaderFileProxy.java        |  61 +++
 .../io/util/BufferManagingRebufferer.java       | 127 +++++++
 .../io/util/BufferedSegmentedFile.java          |  15 +-
 .../apache/cassandra/io/util/ChannelProxy.java  |   1 +
 .../io/util/ChecksummedRandomAccessReader.java  | 109 +++---
 .../apache/cassandra/io/util/ChunkReader.java   |  56 +++
 .../io/util/CompressedSegmentedFile.java        | 283 ++++++++++++--
 .../io/util/DataIntegrityMetadata.java          |  13 +-
 .../cassandra/io/util/ICompressedFile.java      |   2 -
 .../cassandra/io/util/LimitingRebufferer.java   | 106 ++++++
 .../cassandra/io/util/MmapRebufferer.java       |  49 +++
 .../cassandra/io/util/MmappedRegions.java       |  33 +-
 .../cassandra/io/util/MmappedSegmentedFile.java |  58 +--
 .../cassandra/io/util/RandomAccessReader.java   | 257 +++++--------
 .../cassandra/io/util/ReaderFileProxy.java      |  36 ++
 .../apache/cassandra/io/util/Rebufferer.java    |  84 +++++
 .../cassandra/io/util/RebuffererFactory.java    |  32 ++
 .../apache/cassandra/io/util/SegmentedFile.java |  56 +--
 .../cassandra/io/util/SimpleChunkReader.java    |  78 ++++
 .../apache/cassandra/metrics/CacheMetrics.java  |   4 +-
 .../cassandra/metrics/CacheMissMetrics.java     | 114 ++++++
 .../compress/CompressedInputStream.java         |  11 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   9 +
 .../apache/cassandra/tools/nodetool/Info.java   |  22 ++
 .../apache/cassandra/utils/ChecksumType.java    |  28 +-
 .../cassandra/utils/memory/BufferPool.java      |   2 +-
 .../org/apache/cassandra/cql3/CachingBench.java | 375 +++++++++++++++++++
 .../org/apache/cassandra/cql3/CQLTester.java    |   6 +-
 .../selection/SelectionColumnMappingTest.java   |   2 +
 .../entities/FrozenCollectionsTest.java         |   4 +-
 .../cql3/validation/entities/UserTypesTest.java |   4 +-
 .../miscellaneous/CrcCheckChanceTest.java       |   6 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   3 +
 .../org/apache/cassandra/db/VerifyTest.java     |   3 +
 .../compaction/BlacklistingCompactionsTest.java |   3 +
 .../hints/ChecksummedDataInputTest.java         |   6 +-
 .../CompressedRandomAccessReaderTest.java       |  29 +-
 .../CompressedSequentialWriterTest.java         |   2 +-
 .../io/util/BufferedRandomAccessFileTest.java   |   5 +-
 .../cassandra/io/util/MmappedRegionsTest.java   |  32 +-
 .../io/util/RandomAccessReaderTest.java         |  27 +-
 57 files changed, 2387 insertions(+), 853 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 78518b6..4f7d6dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Add uncompressed chunk cache for RandomAccessReader (CASSANDRA-5863)
  * Clarify ClusteringPrefix hierarchy (CASSANDRA-11213)
  * Always perform collision check before joining ring (CASSANDRA-10134)
  * SSTableWriter output discrepancy (CASSANDRA-11646)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 582859c..48bad2c 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -380,9 +380,14 @@ concurrent_counter_writes: 32
 # be limited by the less of concurrent reads or concurrent writes.
 concurrent_materialized_view_writes: 32
 
-# Maximum memory to use for pooling sstable buffers. Defaults to the smaller
-# of 1/4 of heap or 512MB. This pool is allocated off-heap, so is in addition
-# to the memory allocated for heap. Memory is only allocated as needed.
+# Maximum memory to use for sstable chunk cache and buffer pooling.
+# 32MB of this are reserved for pooling buffers, the rest is used as an
+# cache that holds uncompressed sstable chunks.
+# Defaults to the smaller of 1/4 of heap or 512MB. This pool is allocated 
off-heap,
+# so is in addition to the memory allocated for heap. The cache also has 
on-heap
+# overhead which is roughly 128 bytes per chunk (i.e. 0.2% of the reserved size
+# if the default 64k chunk size is used).
+# Memory is only allocated when needed.
 # file_cache_size_in_mb: 512
 
 # Flag indicating whether to allocate on or off heap when the sstable buffer

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/lib/caffeine-2.2.6.jar
----------------------------------------------------------------------
diff --git a/lib/caffeine-2.2.6.jar b/lib/caffeine-2.2.6.jar
new file mode 100644
index 0000000..74b91bc
Binary files /dev/null and b/lib/caffeine-2.2.6.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/lib/licenses/caffeine-2.2.6.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/caffeine-2.2.6.txt b/lib/licenses/caffeine-2.2.6.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/lib/licenses/caffeine-2.2.6.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/cache/CacheSize.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CacheSize.java 
b/src/java/org/apache/cassandra/cache/CacheSize.java
new file mode 100644
index 0000000..561c73d
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/CacheSize.java
@@ -0,0 +1,14 @@
+package org.apache.cassandra.cache;
+
+public interface CacheSize
+{
+
+    long capacity();
+
+    void setCapacity(long capacity);
+
+    int size();
+
+    long weightedSize();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/cache/ChunkCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java 
b/src/java/org/apache/cassandra/cache/ChunkCache.java
new file mode 100644
index 0000000..faf41b4
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/ChunkCache.java
@@ -0,0 +1,301 @@
+package org.apache.cassandra.cache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.github.benmanes.caffeine.cache.*;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.metrics.CacheMissMetrics;
+import org.apache.cassandra.utils.memory.BufferPool;
+
+public class ChunkCache 
+        implements CacheLoader<ChunkCache.Key, ChunkCache.Buffer>, 
RemovalListener<ChunkCache.Key, ChunkCache.Buffer>, CacheSize
+{
+    public static final int RESERVED_POOL_SPACE_IN_MB = 32;
+    public static final long cacheSize = 1024L * 1024L * Math.max(0, 
DatabaseDescriptor.getFileCacheSizeInMB() - RESERVED_POOL_SPACE_IN_MB);
+
+    private static boolean enabled = cacheSize > 0;
+    public static final ChunkCache instance = enabled ? new ChunkCache() : 
null;
+
+    private final LoadingCache<Key, Buffer> cache;
+    public final CacheMissMetrics metrics;
+
+    static class Key
+    {
+        final ChunkReader file;
+        final String path;
+        final long position;
+
+        public Key(ChunkReader file, long position)
+        {
+            super();
+            this.file = file;
+            this.position = position;
+            this.path = file.channel().filePath();
+        }
+
+        public int hashCode()
+        {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + path.hashCode();
+            result = prime * result + file.getClass().hashCode();
+            result = prime * result + Long.hashCode(position);
+            return result;
+        }
+
+        public boolean equals(Object obj)
+        {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+
+            Key other = (Key) obj;
+            return (position == other.position)
+                    && file.getClass() == other.file.getClass()
+                    && path.equals(other.path);
+        }
+    }
+
+    static class Buffer implements Rebufferer.BufferHolder
+    {
+        private final ByteBuffer buffer;
+        private final long offset;
+        private final AtomicInteger references;
+
+        public Buffer(ByteBuffer buffer, long offset)
+        {
+            this.buffer = buffer;
+            this.offset = offset;
+            references = new AtomicInteger(1);  // start referenced.
+        }
+
+        Buffer reference()
+        {
+            int refCount;
+            do
+            {
+                refCount = references.get();
+                if (refCount == 0)
+                    // Buffer was released before we managed to reference it. 
+                    return null;
+            } while (!references.compareAndSet(refCount, refCount + 1));
+
+            return this;
+        }
+
+        @Override
+        public ByteBuffer buffer()
+        {
+            assert references.get() > 0;
+            return buffer.duplicate();
+        }
+
+        @Override
+        public long offset()
+        {
+            return offset;
+        }
+
+        @Override
+        public void release()
+        {
+            if (references.decrementAndGet() == 0)
+                BufferPool.put(buffer);
+        }
+    }
+
+    public ChunkCache()
+    {
+        cache = Caffeine.newBuilder()
+                .maximumWeight(cacheSize)
+                .executor(MoreExecutors.directExecutor())
+                .weigher((key, buffer) -> ((Buffer) buffer).buffer.capacity())
+                .removalListener(this)
+                .build(this);
+        metrics = new CacheMissMetrics("ChunkCache", this);
+    }
+
+    @Override
+    public Buffer load(Key key) throws Exception
+    {
+        ChunkReader rebufferer = key.file;
+        metrics.misses.mark();
+        try (Timer.Context ctx = metrics.missLatency.time())
+        {
+            ByteBuffer buffer = BufferPool.get(key.file.chunkSize());
+            assert buffer != null;
+            rebufferer.readChunk(key.position, buffer);
+            return new Buffer(buffer, key.position);
+        }
+    }
+
+    @Override
+    public void onRemoval(Key key, Buffer buffer, RemovalCause cause)
+    {
+        buffer.release();
+    }
+
+    public void close()
+    {
+        cache.invalidateAll();
+    }
+
+    public RebuffererFactory wrap(ChunkReader file)
+    {
+        return new CachingRebufferer(file);
+    }
+
+    public static RebuffererFactory maybeWrap(ChunkReader file)
+    {
+        if (!enabled)
+            return file;
+
+        return instance.wrap(file);
+    }
+
+    public void invalidatePosition(SegmentedFile dfile, long position)
+    {
+        if (!(dfile.rebuffererFactory() instanceof CachingRebufferer))
+            return;
+
+        ((CachingRebufferer) dfile.rebuffererFactory()).invalidate(position);
+    }
+
+    public void invalidateFile(String fileName)
+    {
+        cache.invalidateAll(Iterables.filter(cache.asMap().keySet(), x -> 
x.path.equals(fileName)));
+    }
+
+    @VisibleForTesting
+    public void enable(boolean enabled)
+    {
+        ChunkCache.enabled = enabled;
+        cache.invalidateAll();
+        metrics.reset();
+    }
+
+    // TODO: Invalidate caches for obsoleted/MOVED_START tables?
+
+    /**
+     * Rebufferer providing cached chunks where data is obtained from the 
specified ChunkReader.
+     * Thread-safe. One instance per SegmentedFile, created by 
ChunkCache.maybeWrap if the cache is enabled.
+     */
+    class CachingRebufferer implements Rebufferer, RebuffererFactory
+    {
+        private final ChunkReader source;
+        final long alignmentMask;
+
+        public CachingRebufferer(ChunkReader file)
+        {
+            source = file;
+            int chunkSize = file.chunkSize();
+            assert Integer.bitCount(chunkSize) == 1;    // Must be power of two
+            alignmentMask = -chunkSize;
+        }
+
+        @Override
+        public Buffer rebuffer(long position)
+        {
+            try
+            {
+                metrics.requests.mark();
+                long pageAlignedPos = position & alignmentMask;
+                Buffer buf;
+                do
+                    buf = cache.get(new Key(source, 
pageAlignedPos)).reference();
+                while (buf == null);
+
+                return buf;
+            }
+            catch (Throwable t)
+            {
+                Throwables.propagateIfInstanceOf(t.getCause(), 
CorruptSSTableException.class);
+                throw Throwables.propagate(t);
+            }
+        }
+
+        public void invalidate(long position)
+        {
+            long pageAlignedPos = position & alignmentMask;
+            cache.invalidate(new Key(source, pageAlignedPos));
+        }
+
+        @Override
+        public Rebufferer instantiateRebufferer()
+        {
+            return this;
+        }
+
+        @Override
+        public void close()
+        {
+            source.close();
+        }
+
+        @Override
+        public void closeReader()
+        {
+            // Instance is shared among readers. Nothing to release.
+        }
+
+        @Override
+        public ChannelProxy channel()
+        {
+            return source.channel();
+        }
+
+        @Override
+        public long fileLength()
+        {
+            return source.fileLength();
+        }
+
+        @Override
+        public double getCrcCheckChance()
+        {
+            return source.getCrcCheckChance();
+        }
+
+        @Override
+        public String toString()
+        {
+            return "CachingRebufferer:" + source.toString();
+        }
+    }
+
+    @Override
+    public long capacity()
+    {
+        return cacheSize;
+    }
+
+    @Override
+    public void setCapacity(long capacity)
+    {
+        throw new UnsupportedOperationException("Chunk cache size cannot be 
changed.");
+    }
+
+    @Override
+    public int size()
+    {
+        return cache.asMap().size();
+    }
+
+    @Override
+    public long weightedSize()
+    {
+        return cache.policy().eviction()
+                .map(policy -> 
policy.weightedSize().orElseGet(cache::estimatedSize))
+                .orElseGet(cache::estimatedSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/cache/ICache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ICache.java 
b/src/java/org/apache/cassandra/cache/ICache.java
index 37b55cd..7ca6b2e 100644
--- a/src/java/org/apache/cassandra/cache/ICache.java
+++ b/src/java/org/apache/cassandra/cache/ICache.java
@@ -24,12 +24,8 @@ import java.util.Iterator;
  * and does not require put or remove to return values, which lets 
SerializingCache
  * be more efficient by avoiding deserialize except on get.
  */
-public interface ICache<K, V>
+public interface ICache<K, V> extends CacheSize
 {
-    public long capacity();
-
-    public void setCapacity(long capacity);
-
     public void put(K key, V value);
 
     public boolean putIfAbsent(K key, V value);
@@ -40,10 +36,6 @@ public interface ICache<K, V>
 
     public void remove(K key);
 
-    public int size();
-
-    public long weightedSize();
-
     public void clear();
 
     public Iterator<K> keyIterator();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java 
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 67d351a..2d70da6 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -443,7 +443,7 @@ public class Scrubber implements Closeable
             }
             catch (Exception e)
             {
-                throw new RuntimeException();
+                throw new RuntimeException(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java 
b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 095d7f4..8bb5b6d 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
 
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.DataPosition;
-import org.apache.cassandra.io.util.RandomAccessReader;
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.memory.BufferPool;
 
 /**
  * A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
@@ -37,35 +39,55 @@ import org.apache.cassandra.io.util.RandomAccessReader;
  * corrupted sequence by reading a huge corrupted length of bytes via
  * via {@link 
org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
  */
-public class ChecksummedDataInput extends 
RandomAccessReader.RandomAccessReaderWithOwnChannel
+public class ChecksummedDataInput extends RebufferingInputStream
 {
     private final CRC32 crc;
     private int crcPosition;
     private boolean crcUpdateDisabled;
 
     private long limit;
-    private DataPosition limitMark;
+    private long limitMark;
+
+    protected long bufferOffset;
+    protected final ChannelProxy channel;
 
-    protected ChecksummedDataInput(Builder builder)
+    ChecksummedDataInput(ChannelProxy channel, BufferType bufferType)
     {
-        super(builder);
+        super(BufferPool.get(RandomAccessReader.DEFAULT_BUFFER_SIZE, 
bufferType));
 
         crc = new CRC32();
         crcPosition = 0;
         crcUpdateDisabled = false;
+        this.channel = channel;
+        bufferOffset = 0;
+        buffer.limit(0);
 
         resetLimit();
     }
 
-    @SuppressWarnings("resource")   // channel owned by 
RandomAccessReaderWithOwnChannel
+    ChecksummedDataInput(ChannelProxy channel)
+    {
+        this(channel, BufferType.OFF_HEAP);
+    }
+
+    @SuppressWarnings("resource")
     public static ChecksummedDataInput open(File file)
     {
-        return new Builder(new ChannelProxy(file)).build();
+        return new ChecksummedDataInput(new ChannelProxy(file));
+    }
+
+    public boolean isEOF()
+    {
+        return getPosition() == channel.size();
     }
 
-    protected void releaseBuffer()
+    /**
+     * Returns the position in the source file, which is different for 
getPosition() for compressed/encrypted files
+     * and may be imprecise.
+     */
+    public long getSourcePosition()
     {
-        super.releaseBuffer();
+        return getPosition();
     }
 
     public void resetCrc()
@@ -76,29 +98,34 @@ public class ChecksummedDataInput extends 
RandomAccessReader.RandomAccessReaderW
 
     public void limit(long newLimit)
     {
-        limit = newLimit;
-        limitMark = mark();
+        limitMark = getPosition();
+        limit = limitMark + newLimit;
+    }
+
+    /**
+     * Returns the exact position in the uncompressed view of the file.
+     */
+    protected long getPosition()
+    {
+        return bufferOffset + buffer.position();
     }
 
     public void resetLimit()
     {
         limit = Long.MAX_VALUE;
-        limitMark = null;
+        limitMark = -1;
     }
 
     public void checkLimit(int length) throws IOException
     {
-        if (limitMark == null)
-            return;
-
-        if ((bytesPastLimit() + length) > limit)
+        if (getPosition() + length > limit)
             throw new IOException("Digest mismatch exception");
     }
 
     public long bytesPastLimit()
     {
-        assert limitMark != null;
-        return bytesPastMark(limitMark);
+        assert limitMark != -1;
+        return getPosition() - limitMark;
     }
 
     public boolean checkCrc() throws IOException
@@ -134,13 +161,24 @@ public class ChecksummedDataInput extends 
RandomAccessReader.RandomAccessReaderW
     }
 
     @Override
-    public void reBuffer()
+    protected void reBuffer()
     {
+        Preconditions.checkState(buffer.remaining() == 0);
         updateCrc();
-        super.reBuffer();
+        bufferOffset += buffer.limit();
+
+        readBuffer();
+
         crcPosition = buffer.position();
     }
 
+    protected void readBuffer()
+    {
+        buffer.clear();
+        while ((channel.read(buffer, bufferOffset)) == 0) {}
+        buffer.flip();
+    }
+
     private void updateCrc()
     {
         if (crcPosition == buffer.position() || crcUpdateDisabled)
@@ -155,16 +193,20 @@ public class ChecksummedDataInput extends 
RandomAccessReader.RandomAccessReaderW
         crc.update(unprocessed);
     }
 
-    public static class Builder extends RandomAccessReader.Builder
+    @Override
+    public void close()
     {
-        public Builder(ChannelProxy channel)
-        {
-            super(channel);
-        }
+        BufferPool.put(buffer);
+        channel.close();
+    }
 
-        public ChecksummedDataInput build()
-        {
-            return new ChecksummedDataInput(this);
-        }
+    protected String getPath()
+    {
+        return channel.filePath();
+    }
+
+    public ChannelProxy getChannel()
+    {
+        return channel;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java 
b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
index c43e46e..f584dd1 100644
--- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -35,13 +35,11 @@ public final class CompressedChecksummedDataInput extends 
ChecksummedDataInput
     private volatile ByteBuffer compressedBuffer = null;
     private final ByteBuffer metadataBuffer = 
ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE);
 
-    public 
CompressedChecksummedDataInput(CompressedChecksummedDataInputBuilder builder)
+    public CompressedChecksummedDataInput(ChannelProxy channel, ICompressor 
compressor, long filePosition)
     {
-        super(builder);
-        assert regions == null;  //mmapped regions are not supported
-
-        compressor = builder.compressor;
-        filePosition = builder.position;
+        super(channel, compressor.preferredBufferType());
+        this.compressor = compressor;
+        this.filePosition = filePosition;
     }
 
     /**
@@ -53,7 +51,13 @@ public final class CompressedChecksummedDataInput extends 
ChecksummedDataInput
         return filePosition == channel.size() && buffer.remaining() == 0;
     }
 
-    protected void reBufferStandard()
+    public long getSourcePosition()
+    {
+        return filePosition;
+    }
+
+    @Override
+    protected void readBuffer()
     {
         metadataBuffer.clear();
         channel.read(metadataBuffer, filePosition);
@@ -70,7 +74,7 @@ public final class CompressedChecksummedDataInput extends 
ChecksummedDataInput
             {
                 BufferPool.put(compressedBuffer);
             }
-            compressedBuffer = allocateBuffer(bufferSize, 
compressor.preferredBufferType());
+            compressedBuffer = BufferPool.get(bufferSize, 
compressor.preferredBufferType());
         }
 
         compressedBuffer.clear();
@@ -79,12 +83,11 @@ public final class CompressedChecksummedDataInput extends 
ChecksummedDataInput
         compressedBuffer.rewind();
         filePosition += compressedSize;
 
-        bufferOffset += buffer.position();
         if (buffer.capacity() < uncompressedSize)
         {
             int bufferSize = uncompressedSize + (uncompressedSize / 20);
             BufferPool.put(buffer);
-            buffer = allocateBuffer(bufferSize, 
compressor.preferredBufferType());
+            buffer = BufferPool.get(bufferSize, 
compressor.preferredBufferType());
         }
 
         buffer.clear();
@@ -100,19 +103,11 @@ public final class CompressedChecksummedDataInput extends 
ChecksummedDataInput
         }
     }
 
-    protected void releaseBuffer()
-    {
-        super.releaseBuffer();
-        if (compressedBuffer != null)
-        {
-            BufferPool.put(compressedBuffer);
-            compressedBuffer = null;
-        }
-    }
-
-    protected void reBufferMmap()
+    @Override
+    public void close()
     {
-        throw new UnsupportedOperationException();
+        BufferPool.put(compressedBuffer);
+        super.close();
     }
 
     @SuppressWarnings("resource") // Closing the ChecksummedDataInput will 
close the underlying channel.
@@ -121,10 +116,7 @@ public final class CompressedChecksummedDataInput extends 
ChecksummedDataInput
         long position = input.getPosition();
         input.close();
 
-        CompressedChecksummedDataInputBuilder builder = new 
CompressedChecksummedDataInputBuilder(new ChannelProxy(input.getPath()));
-        builder.withPosition(position);
-        builder.withCompressor(compressor);
-        return builder.build();
+        return new CompressedChecksummedDataInput(new 
ChannelProxy(input.getPath()), compressor, position);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java
 
b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java
deleted file mode 100644
index 3452df8..0000000
--- 
a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.cassandra.hints;
-
-import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.ChannelProxy;
-
-public class CompressedChecksummedDataInputBuilder extends 
ChecksummedDataInput.Builder
-{
-    long position;
-    ICompressor compressor;
-
-    public CompressedChecksummedDataInputBuilder(ChannelProxy channel)
-    {
-        super(channel);
-        bufferType = null;
-    }
-
-    public ChecksummedDataInput build()
-    {
-        assert position >= 0;
-        assert compressor != null;
-        return new CompressedChecksummedDataInput(this);
-    }
-
-    public CompressedChecksummedDataInputBuilder withCompressor(ICompressor 
compressor)
-    {
-        this.compressor = compressor;
-        bufferType = compressor.preferredBufferType();
-        return this;
-    }
-
-    public CompressedChecksummedDataInputBuilder withPosition(long position)
-    {
-        this.position = position;
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java 
b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
index 12b6bf2..7ecfbfe 100644
--- a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
@@ -43,12 +43,12 @@ public class EncryptedChecksummedDataInput extends 
ChecksummedDataInput
 
     private final EncryptionUtils.ChannelProxyReadChannel readChannel;
 
-    protected EncryptedChecksummedDataInput(Builder builder)
+    protected EncryptedChecksummedDataInput(ChannelProxy channel, Cipher 
cipher, ICompressor compressor, long filePosition)
     {
-        super(builder);
-        cipher = builder.cipher;
-        compressor = builder.compressor;
-        readChannel = new EncryptionUtils.ChannelProxyReadChannel(channel, 
builder.position);
+        super(channel);
+        this.cipher = cipher;
+        this.compressor = compressor;
+        readChannel = new EncryptionUtils.ChannelProxyReadChannel(channel, 
filePosition);
         assert cipher != null;
         assert compressor != null;
     }
@@ -59,10 +59,16 @@ public class EncryptedChecksummedDataInput extends 
ChecksummedDataInput
      */
     public boolean isEOF()
     {
-        return readChannel.getCurrentPosition() == channel.size() && 
buffer.remaining() == 0;
+        return getSourcePosition() == channel.size() && buffer.remaining() == 
0;
     }
 
-    protected void reBufferStandard()
+    public long getSourcePosition()
+    {
+        return readChannel.getCurrentPosition();
+    }
+
+    @Override
+    protected void readBuffer()
     {
         try
         {
@@ -79,40 +85,13 @@ public class EncryptedChecksummedDataInput extends 
ChecksummedDataInput
         }
     }
 
-    public static class Builder extends CompressedChecksummedDataInputBuilder
-    {
-        Cipher cipher;
-
-        public Builder(ChannelProxy channel)
-        {
-            super(channel);
-        }
-
-        public Builder withCipher(Cipher cipher)
-        {
-            this.cipher = cipher;
-            return this;
-        }
-
-        public ChecksummedDataInput build()
-        {
-            assert position >= 0;
-            assert compressor != null;
-            assert cipher != null;
-            return new EncryptedChecksummedDataInput(this);
-        }
-    }
-
+    @SuppressWarnings("resource")
     public static ChecksummedDataInput upgradeInput(ChecksummedDataInput 
input, Cipher cipher, ICompressor compressor)
     {
         long position = input.getPosition();
         input.close();
 
-        Builder builder = new Builder(new ChannelProxy(input.getPath()));
-        builder.withPosition(position);
-        builder.withCompressor(compressor);
-        builder.withCipher(cipher);
-        return builder.build();
+        return new EncryptedChecksummedDataInput(new 
ChannelProxy(input.getPath()), cipher, compressor, position);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java 
b/src/java/org/apache/cassandra/hints/HintsReader.java
index 0571af4..5e73805 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -111,7 +111,7 @@ class HintsReader implements AutoCloseable, 
Iterable<HintsReader.Page>
 
     void seek(long newPosition)
     {
-        input.seek(newPosition);
+        throw new UnsupportedOperationException("Hints are not seekable.");
     }
 
     public Iterator<Page> iterator()
@@ -149,12 +149,12 @@ class HintsReader implements AutoCloseable, 
Iterable<HintsReader.Page>
         @SuppressWarnings("resource")
         protected Page computeNext()
         {
-            CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, 
input.getFilePointer(), input.getPath());
+            CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, 
input.getSourcePosition(), input.getPath());
 
             if (input.isEOF())
                 return endOfData();
 
-            return new Page(input.getFilePointer());
+            return new Page(input.getSourcePosition());
         }
     }
 
@@ -177,7 +177,7 @@ class HintsReader implements AutoCloseable, 
Iterable<HintsReader.Page>
 
             do
             {
-                long position = input.getFilePointer();
+                long position = input.getSourcePosition();
 
                 if (input.isEOF())
                     return endOfData(); // reached EOF
@@ -267,7 +267,7 @@ class HintsReader implements AutoCloseable, 
Iterable<HintsReader.Page>
 
             do
             {
-                long position = input.getFilePointer();
+                long position = input.getSourcePosition();
 
                 if (input.isEOF())
                     return endOfData(); // reached EOF

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java 
b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
deleted file mode 100644
index 329d932..0000000
--- 
a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.compress;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.zip.Checksum;
-import java.util.function.Supplier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.primitives.Ints;
-
-import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.utils.memory.BufferPool;
-
-/**
- * CRAR extends RAR to transparently uncompress blocks from the file into 
RAR.buffer.  Most of the RAR
- * "read bytes from the buffer, rebuffering when necessary" machinery works 
unchanged after that.
- */
-public class CompressedRandomAccessReader extends RandomAccessReader
-{
-    private final CompressionMetadata metadata;
-
-    // we read the raw compressed bytes into this buffer, then move the 
uncompressed ones into super.buffer.
-    private ByteBuffer compressed;
-
-    // re-use single crc object
-    private final Checksum checksum;
-
-    // raw checksum bytes
-    private ByteBuffer checksumBytes;
-
-    @VisibleForTesting
-    public double getCrcCheckChance()
-    {
-        return metadata.parameters.getCrcCheckChance();
-    }
-
-    protected CompressedRandomAccessReader(Builder builder)
-    {
-        super(builder);
-        this.metadata = builder.metadata;
-        this.checksum = metadata.checksumType.newInstance();
-
-        if (regions == null)
-        {
-            compressed = 
allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()),
 bufferType);
-            checksumBytes = ByteBuffer.wrap(new byte[4]);
-        }
-    }
-
-    @Override
-    protected void releaseBuffer()
-    {
-        try
-        {
-            if (buffer != null)
-            {
-                BufferPool.put(buffer);
-                buffer = null;
-            }
-        }
-        finally
-        {
-            // this will always be null if using mmap access mode (unlike in 
parent, where buffer is set to a region)
-            if (compressed != null)
-            {
-                BufferPool.put(compressed);
-                compressed = null;
-            }
-        }
-    }
-
-    @Override
-    protected void reBufferStandard()
-    {
-        try
-        {
-            long position = current();
-            assert position < metadata.dataLength;
-
-            CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
-
-            if (compressed.capacity() < chunk.length)
-            {
-                BufferPool.put(compressed);
-                compressed = allocateBuffer(chunk.length, bufferType);
-            }
-            else
-            {
-                compressed.clear();
-            }
-
-            compressed.limit(chunk.length);
-            if (channel.read(compressed, chunk.offset) != chunk.length)
-                throw new CorruptBlockException(getPath(), chunk);
-
-            compressed.flip();
-            buffer.clear();
-
-            try
-            {
-                metadata.compressor().uncompress(compressed, buffer);
-            }
-            catch (IOException e)
-            {
-                throw new CorruptBlockException(getPath(), chunk);
-            }
-            finally
-            {
-                buffer.flip();
-            }
-
-            if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
-            {
-                compressed.rewind();
-                metadata.checksumType.update( checksum, (compressed));
-
-                if (checksum(chunk) != (int) checksum.getValue())
-                    throw new CorruptBlockException(getPath(), chunk);
-
-                // reset checksum object back to the original (blank) state
-                checksum.reset();
-            }
-
-            // buffer offset is always aligned
-            bufferOffset = position & ~(buffer.capacity() - 1);
-            buffer.position((int) (position - bufferOffset));
-            // the length() can be provided at construction time, to override 
the true (uncompressed) length of the file;
-            // this is permitted to occur within a compressed segment, so we 
truncate validBufferBytes if we cross the imposed length
-            if (bufferOffset + buffer.limit() > length())
-                buffer.limit((int)(length() - bufferOffset));
-        }
-        catch (CorruptBlockException e)
-        {
-            throw new CorruptSSTableException(e, getPath());
-        }
-        catch (IOException e)
-        {
-            throw new FSReadError(e, getPath());
-        }
-    }
-
-    @Override
-    protected void reBufferMmap()
-    {
-        try
-        {
-            long position = current();
-            assert position < metadata.dataLength;
-
-            CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
-
-            MmappedRegions.Region region = regions.floor(chunk.offset);
-            long segmentOffset = region.bottom();
-            int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
-            ByteBuffer compressedChunk = region.buffer.duplicate(); // TODO: 
change to slice(chunkOffset) when we upgrade LZ4-java
-
-            compressedChunk.position(chunkOffset).limit(chunkOffset + 
chunk.length);
-
-            buffer.clear();
-
-            try
-            {
-                metadata.compressor().uncompress(compressedChunk, buffer);
-            }
-            catch (IOException e)
-            {
-                throw new CorruptBlockException(getPath(), chunk);
-            }
-            finally
-            {
-                buffer.flip();
-            }
-
-            if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
-            {
-                compressedChunk.position(chunkOffset).limit(chunkOffset + 
chunk.length);
-
-                metadata.checksumType.update( checksum, compressedChunk);
-
-                compressedChunk.limit(compressedChunk.capacity());
-                if (compressedChunk.getInt() != (int) checksum.getValue())
-                    throw new CorruptBlockException(getPath(), chunk);
-
-                // reset checksum object back to the original (blank) state
-                checksum.reset();
-            }
-
-            // buffer offset is always aligned
-            bufferOffset = position & ~(buffer.capacity() - 1);
-            buffer.position((int) (position - bufferOffset));
-            // the length() can be provided at construction time, to override 
the true (uncompressed) length of the file;
-            // this is permitted to occur within a compressed segment, so we 
truncate validBufferBytes if we cross the imposed length
-            if (bufferOffset + buffer.limit() > length())
-                buffer.limit((int)(length() - bufferOffset));
-        }
-        catch (CorruptBlockException e)
-        {
-            throw new CorruptSSTableException(e, getPath());
-        }
-
-    }
-
-    private int checksum(CompressionMetadata.Chunk chunk) throws IOException
-    {
-        long position = chunk.offset + chunk.length;
-        checksumBytes.clear();
-        if (channel.read(checksumBytes, position) != checksumBytes.capacity())
-            throw new CorruptBlockException(getPath(), chunk);
-        return checksumBytes.getInt(0);
-    }
-
-    @Override
-    public long length()
-    {
-        return metadata.dataLength;
-    }
-
-    @Override
-    public String toString()
-    {
-        return String.format("%s - chunk length %d, data length %d.", 
getPath(), metadata.chunkLength(), metadata.dataLength);
-    }
-
-    public final static class Builder extends RandomAccessReader.Builder
-    {
-        private final CompressionMetadata metadata;
-
-        public Builder(ICompressedFile file)
-        {
-            super(file.channel());
-            this.metadata = applyMetadata(file.getMetadata());
-            this.regions = file.regions();
-        }
-
-        public Builder(ChannelProxy channel, CompressionMetadata metadata)
-        {
-            super(channel);
-            this.metadata = applyMetadata(metadata);
-        }
-
-        private CompressionMetadata applyMetadata(CompressionMetadata metadata)
-        {
-            this.overrideLength = metadata.compressedFileLength;
-            this.bufferSize = metadata.chunkLength();
-            this.bufferType = metadata.compressor().preferredBufferType();
-
-            assert Integer.bitCount(this.bufferSize) == 1; //must be a power 
of two
-
-            return metadata;
-        }
-
-        @Override
-        protected ByteBuffer createBuffer()
-        {
-            buffer = allocateBuffer(bufferSize, bufferType);
-            buffer.limit(0);
-            return buffer;
-        }
-
-        @Override
-        public RandomAccessReader build()
-        {
-            return new CompressedRandomAccessReader(this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 3181a55..a77b673 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1637,7 +1637,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
      */
     public long uncompressedLength()
     {
-        return dfile.length;
+        return dfile.dataLength();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index bbb22d4..b6ea1d9 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -32,6 +32,8 @@ import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.ChunkCache;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.rows.*;
@@ -58,7 +60,9 @@ public class BigTableWriter extends SSTableWriter
     private final SegmentedFile.Builder dbuilder;
     protected final SequentialWriter dataFile;
     private DecoratedKey lastWrittenKey;
+
     private DataPosition dataMark;
+    private long lastEarlyOpenLength = 0;
 
     public BigTableWriter(Descriptor descriptor, 
                           Long keyCount, 
@@ -265,6 +269,7 @@ public class BigTableWriter extends SSTableWriter
         IndexSummary indexSummary = 
iwriter.summary.build(metadata.partitioner, boundary);
         SegmentedFile ifile = iwriter.builder.buildIndex(descriptor, 
indexSummary, boundary);
         SegmentedFile dfile = dbuilder.buildData(descriptor, stats, boundary);
+        invalidateCacheAtBoundary(dfile);
         SSTableReader sstable = SSTableReader.internalOpen(descriptor,
                                                            components, 
metadata,
                                                            ifile, dfile, 
indexSummary,
@@ -276,6 +281,13 @@ public class BigTableWriter extends SSTableWriter
         return sstable;
     }
 
+    void invalidateCacheAtBoundary(SegmentedFile dfile)
+    {
+        if (ChunkCache.instance != null && lastEarlyOpenLength != 0 && 
dfile.dataLength() > lastEarlyOpenLength)
+            ChunkCache.instance.invalidatePosition(dfile, lastEarlyOpenLength);
+        lastEarlyOpenLength = dfile.dataLength();
+    }
+
     public SSTableReader openFinalEarly()
     {
         // we must ensure the data is completely flushed to disk
@@ -296,6 +308,7 @@ public class BigTableWriter extends SSTableWriter
         IndexSummary indexSummary = 
iwriter.summary.build(this.metadata.partitioner);
         SegmentedFile ifile = iwriter.builder.buildIndex(desc, indexSummary);
         SegmentedFile dfile = dbuilder.buildData(desc, stats);
+        invalidateCacheAtBoundary(dfile);
         SSTableReader sstable = SSTableReader.internalOpen(desc,
                                                            components,
                                                            this.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java 
b/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
new file mode 100644
index 0000000..5dc0d37
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+public abstract class AbstractReaderFileProxy implements ReaderFileProxy
+{
+    protected final ChannelProxy channel;
+    protected final long fileLength;
+
+    public AbstractReaderFileProxy(ChannelProxy channel, long fileLength)
+    {
+        this.channel = channel;
+        this.fileLength = fileLength >= 0 ? fileLength : channel.size();
+    }
+
+    @Override
+    public ChannelProxy channel()
+    {
+        return channel;
+    }
+
+    @Override
+    public long fileLength()
+    {
+        return fileLength;
+    }
+
+    @Override
+    public String toString()
+    {
+        return getClass().getSimpleName() + "(filePath='" + channel + "')";
+    }
+
+    @Override
+    public void close()
+    {
+        // nothing in base class
+    }
+
+    @Override
+    public double getCrcCheckChance()
+    {
+        return 0; // Only valid for compressed files.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java 
b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
new file mode 100644
index 0000000..0e67364
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
@@ -0,0 +1,127 @@
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.utils.memory.BufferPool;
+
+/**
+ * Buffer manager used for reading from a ChunkReader when cache is not in 
use. Instances of this class are
+ * reader-specific and thus do not need to be thread-safe since the reader 
itself isn't.
+ *
+ * The instances reuse themselves as the BufferHolder to avoid having to 
return a new object for each rebuffer call.
+ */
+public abstract class BufferManagingRebufferer implements Rebufferer, 
Rebufferer.BufferHolder
+{
+    protected final ChunkReader source;
+    protected final ByteBuffer buffer;
+    protected long offset = 0;
+
+    public static BufferManagingRebufferer on(ChunkReader wrapped)
+    {
+        return wrapped.alignmentRequired()
+             ? new Aligned(wrapped)
+             : new Unaligned(wrapped);
+    }
+
+    abstract long alignedPosition(long position);
+
+    public BufferManagingRebufferer(ChunkReader wrapped)
+    {
+        this.source = wrapped;
+        buffer = RandomAccessReader.allocateBuffer(wrapped.chunkSize(), 
wrapped.preferredBufferType());
+        buffer.limit(0);
+    }
+
+    @Override
+    public void closeReader()
+    {
+        BufferPool.put(buffer);
+        offset = -1;
+    }
+
+    @Override
+    public void close()
+    {
+        assert offset == -1;    // reader must be closed at this point.
+        source.close();
+    }
+
+    @Override
+    public ChannelProxy channel()
+    {
+        return source.channel();
+    }
+
+    @Override
+    public long fileLength()
+    {
+        return source.fileLength();
+    }
+
+    @Override
+    public BufferHolder rebuffer(long position)
+    {
+        offset = alignedPosition(position);
+        source.readChunk(offset, buffer);
+        return this;
+    }
+
+    @Override
+    public double getCrcCheckChance()
+    {
+        return source.getCrcCheckChance();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "BufferManagingRebufferer." + getClass().getSimpleName() + ":" 
+ source.toString();
+    }
+
+    // BufferHolder methods
+
+    public ByteBuffer buffer()
+    {
+        return buffer;
+    }
+
+    public long offset()
+    {
+        return offset;
+    }
+
+    @Override
+    public void release()
+    {
+        // nothing to do, we don't delete buffers before we're closed.
+    }
+
+    public static class Unaligned extends BufferManagingRebufferer
+    {
+        public Unaligned(ChunkReader wrapped)
+        {
+            super(wrapped);
+        }
+
+        @Override
+        long alignedPosition(long position)
+        {
+            return position;
+        }
+    }
+
+    public static class Aligned extends BufferManagingRebufferer
+    {
+        public Aligned(ChunkReader wrapped)
+        {
+            super(wrapped);
+            assert Integer.bitCount(wrapped.chunkSize()) == 1;
+        }
+
+        @Override
+        long alignedPosition(long position)
+        {
+            return position & -buffer.capacity();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 090c5bd..a46ec14 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -17,11 +17,19 @@
  */
 package org.apache.cassandra.io.util;
 
+import org.apache.cassandra.cache.ChunkCache;
+import org.apache.cassandra.io.compress.BufferType;
+
 public class BufferedSegmentedFile extends SegmentedFile
 {
     public BufferedSegmentedFile(ChannelProxy channel, int bufferSize, long 
length)
     {
-        super(new Cleanup(channel), channel, bufferSize, length);
+        this(channel, createRebufferer(channel, length, bufferSize), length);
+    }
+
+    private BufferedSegmentedFile(ChannelProxy channel, RebuffererFactory 
rebufferer, long length)
+    {
+        super(new Cleanup(channel, rebufferer), channel, rebufferer, length);
     }
 
     private BufferedSegmentedFile(BufferedSegmentedFile copy)
@@ -29,6 +37,11 @@ public class BufferedSegmentedFile extends SegmentedFile
         super(copy);
     }
 
+    private static RebuffererFactory createRebufferer(ChannelProxy channel, 
long length, int bufferSize)
+    {
+        return ChunkCache.maybeWrap(new SimpleChunkReader(channel, length, 
BufferType.OFF_HEAP, bufferSize));
+    }
+
     public static class Builder extends SegmentedFile.Builder
     {
         public SegmentedFile complete(ChannelProxy channel, int bufferSize, 
long overrideLength)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/ChannelProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChannelProxy.java 
b/src/java/org/apache/cassandra/io/util/ChannelProxy.java
index f866160..361b0d3 100644
--- a/src/java/org/apache/cassandra/io/util/ChannelProxy.java
+++ b/src/java/org/apache/cassandra/io/util/ChannelProxy.java
@@ -125,6 +125,7 @@ public final class ChannelProxy extends SharedCloseableImpl
     {
         try
         {
+            // FIXME: consider wrapping in a while loop
             return channel.read(buffer, position);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java 
b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
index 30f1e0c..25ef615 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -19,13 +19,13 @@ package org.apache.cassandra.io.util;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.zip.CRC32;
 
 import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.ChecksumType;
 
-public class ChecksummedRandomAccessReader extends RandomAccessReader
+public class ChecksummedRandomAccessReader
 {
     @SuppressWarnings("serial")
     public static class CorruptFileException extends RuntimeException
@@ -39,67 +39,56 @@ public class ChecksummedRandomAccessReader extends 
RandomAccessReader
         }
     }
 
-    private final DataIntegrityMetadata.ChecksumValidator validator;
-
-    private ChecksummedRandomAccessReader(Builder builder)
-    {
-        super(builder);
-        this.validator = builder.validator;
-    }
-
-    @SuppressWarnings("resource")
-    @Override
-    protected void reBufferStandard()
+    static class ChecksummedRebufferer extends BufferManagingRebufferer
     {
-        long desiredPosition = current();
-        // align with buffer size, as checksums were computed in chunks of 
buffer size each.
-        bufferOffset = (desiredPosition / buffer.capacity()) * 
buffer.capacity();
-
-        buffer.clear();
+        private final DataIntegrityMetadata.ChecksumValidator validator;
 
-        long position = bufferOffset;
-        while (buffer.hasRemaining())
+        public ChecksummedRebufferer(ChannelProxy channel, ChecksumValidator 
validator)
         {
-            int n = channel.read(buffer, position);
-            if (n < 0)
-                break;
-            position += n;
+            super(new SimpleChunkReader(channel, channel.size(), 
BufferType.ON_HEAP, validator.chunkSize));
+            this.validator = validator;
         }
 
-        buffer.flip();
-
-        try
+        @Override
+        public BufferHolder rebuffer(long desiredPosition)
         {
-            validator.validate(ByteBufferUtil.getArray(buffer), 0, 
buffer.remaining());
+            if (desiredPosition != offset + buffer.position())
+                validator.seek(desiredPosition);
+
+            // align with buffer size, as checksums were computed in chunks of 
buffer size each.
+            offset = alignedPosition(desiredPosition);
+            source.readChunk(offset, buffer);
+
+            try
+            {
+                validator.validate(ByteBufferUtil.getArray(buffer), 0, 
buffer.remaining());
+            }
+            catch (IOException e)
+            {
+                throw new CorruptFileException(e, channel().filePath());
+            }
+
+            return this;
         }
-        catch (IOException e)
+
+        @Override
+        public void close()
         {
-            throw new CorruptFileException(e, channel.filePath());
+            try
+            {
+                source.close();
+            }
+            finally
+            {
+                validator.close();
+            }
         }
 
-        buffer.position((int) (desiredPosition - bufferOffset));
-    }
-
-    @Override
-    protected void reBufferMmap()
-    {
-        throw new AssertionError("Unsupported operation");
-    }
-
-    @Override
-    public void seek(long newPosition)
-    {
-        validator.seek(newPosition);
-        super.seek(newPosition);
-    }
-
-    @Override
-    public void close()
-    {
-        Throwables.perform(channel.filePath(), Throwables.FileOpType.READ,
-                           super::close,
-                           validator::close,
-                           channel::close);
+        @Override
+        long alignedPosition(long desiredPosition)
+        {
+            return (desiredPosition / buffer.capacity()) * buffer.capacity();
+        }
     }
 
     public static final class Builder extends RandomAccessReader.Builder
@@ -110,18 +99,22 @@ public class ChecksummedRandomAccessReader extends 
RandomAccessReader
         public Builder(File file, File crcFile) throws IOException
         {
             super(new ChannelProxy(file));
-            this.validator = new DataIntegrityMetadata.ChecksumValidator(new 
CRC32(),
+            this.validator = new 
DataIntegrityMetadata.ChecksumValidator(ChecksumType.CRC32,
                                                                          
RandomAccessReader.open(crcFile),
                                                                          
file.getPath());
+        }
 
-            super.bufferSize(validator.chunkSize)
-                 .bufferType(BufferType.ON_HEAP);
+        @Override
+        protected Rebufferer createRebufferer()
+        {
+            return new ChecksummedRebufferer(channel, validator);
         }
 
         @Override
         public RandomAccessReader build()
         {
-            return new ChecksummedRandomAccessReader(this);
+            // Always own and close the channel.
+            return buildWithChannel();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/ChunkReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChunkReader.java 
b/src/java/org/apache/cassandra/io/util/ChunkReader.java
new file mode 100644
index 0000000..a04299a
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ChunkReader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.compress.BufferType;
+
+/**
+ * RandomFileReader component that reads data from a file into a provided 
buffer and may have requirements over the
+ * size and alignment of reads.
+ * A caching or buffer-managing rebufferer will reference one of these to do 
the actual reading.
+ * Note: Implementations of this interface must be thread-safe!
+ */
+public interface ChunkReader extends RebuffererFactory
+{
+    /**
+     * Read the chunk at the given position, attempting to fill the capacity 
of the given buffer.
+     * The filled buffer must be positioned at 0, with limit set at the size 
of the available data.
+     * The source may have requirements for the positioning and/or size of the 
buffer (e.g. chunk-aligned and
+     * chunk-sized). These must be satisfied by the caller. 
+     */
+    void readChunk(long position, ByteBuffer buffer);
+
+    /**
+     * Buffer size required for this rebufferer. Must be power of 2 if 
alignment is required.
+     */
+    int chunkSize();
+
+    /**
+     * If true, positions passed to this rebufferer must be aligned to 
chunkSize.
+     */
+    boolean alignmentRequired();
+
+    /**
+     * Specifies type of buffer the caller should attempt to give.
+     * This is not guaranteed to be fulfilled.
+     */
+    BufferType preferredBufferType();
+}
\ No newline at end of file

Reply via email to