netudima commented on code in PR #3606:
URL: https://github.com/apache/cassandra/pull/3606#discussion_r1938584331


##########
src/java/org/apache/cassandra/io/util/CompressedChunkReader.java:
##########
@@ -83,20 +90,167 @@ public BufferType preferredBufferType()
     }
 
     @Override
-    public Rebufferer instantiateRebufferer()
+    public Rebufferer instantiateRebufferer(boolean isScan)
     {
-        return new BufferManagingRebufferer.Aligned(this);
+        return new BufferManagingRebufferer.Aligned(isScan ? forScan() : this);
     }
 
-    public static class Standard extends CompressedChunkReader
+    protected interface CompressedReader extends Closeable
+    {
+        default void allocateResources()
+        {
+        }
+
+        default void deallocateResources()
+        {
+        }
+
+        default boolean allocated()
+        {
+            return false;
+        }
+
+        default void close()
+        {
+
+        }
+
+
+        ByteBuffer read(CompressionMetadata.Chunk chunk, boolean 
shouldCheckCrc) throws CorruptBlockException;
+    }
+
+    private static class RandomAccessCompressedReader implements 
CompressedReader
+    {
+        private final ChannelProxy channel;
+        private final ThreadLocalByteBufferHolder bufferHolder;
+
+        private RandomAccessCompressedReader(ChannelProxy channel, 
CompressionMetadata metadata)
+        {
+            this.channel = channel;
+            this.bufferHolder = new 
ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
+        }
+
+        @Override
+        public ByteBuffer read(CompressionMetadata.Chunk chunk, boolean 
shouldCheckCrc) throws CorruptBlockException
+        {
+            int length = shouldCheckCrc ? chunk.length + Integer.BYTES // 
compressed length + checksum length
+                                        : chunk.length;
+            ByteBuffer compressed = bufferHolder.getBuffer(length);
+            if (channel.read(compressed, chunk.offset) != length)
+                throw new CorruptBlockException(channel.filePath(), chunk);
+            compressed.flip();
+            compressed.limit(chunk.length);
+
+            if (shouldCheckCrc)
+            {
+                int checksum = (int) ChecksumType.CRC32.of(compressed);
+                compressed.limit(length);
+                if (compressed.getInt() != checksum)
+                    throw new CorruptBlockException(channel.filePath(), chunk);
+                compressed.position(0).limit(chunk.length);
+            }
+            return compressed;
+        }
+    }
+
+    private static class ScanCompressedReader implements CompressedReader
     {
-        // we read the raw compressed bytes into this buffer, then 
uncompressed them into the provided one.
+        private final ChannelProxy channel;
         private final ThreadLocalByteBufferHolder bufferHolder;
+        private final ThreadLocalReadAheadBuffer readAheadBuffer;
+
+        private ScanCompressedReader(ChannelProxy channel, CompressionMetadata 
metadata, int readAheadBufferSize)
+        {
+            this.channel = channel;
+            this.bufferHolder = new 
ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
+            this.readAheadBuffer = new ThreadLocalReadAheadBuffer(channel, 
readAheadBufferSize, metadata.compressor().preferredBufferType());
+        }
+
+        @Override
+        public ByteBuffer read(CompressionMetadata.Chunk chunk, boolean 
shouldCheckCrc) throws CorruptBlockException
+        {
+            int length = shouldCheckCrc ? chunk.length + Integer.BYTES // 
compressed length + checksum length
+                                        : chunk.length;
+            ByteBuffer compressed = bufferHolder.getBuffer(length);
+
+            int copied = 0;
+            while (copied < length) {

Review Comment:
   the last idea, please fill free to ignore it :-)
   we can move the whole loop into the readAheadBuffer logic and do a single 
thread local lookup for a block in such case (currently we do 3 or 4 lookups 
per loop iteration). It also will remove a duplicated code from a test.
   like:
   ```
       public void read(ByteBuffer target, long offset, int length)
       {
           int copied = 0;
           Block block = getBlock();
           while (copied < length)
           {
               fill(block, offset + copied);
               int leftToRead = length - copied;
               if (block.buffer.remaining() >= leftToRead)
                   copied += read(block, target, leftToRead);
               else
                   copied += read(block, target, block.buffer.remaining());
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to