netudima commented on code in PR #3606: URL: https://github.com/apache/cassandra/pull/3606#discussion_r1938584331
########## src/java/org/apache/cassandra/io/util/CompressedChunkReader.java: ########## @@ -83,20 +90,167 @@ public BufferType preferredBufferType() } @Override - public Rebufferer instantiateRebufferer() + public Rebufferer instantiateRebufferer(boolean isScan) { - return new BufferManagingRebufferer.Aligned(this); + return new BufferManagingRebufferer.Aligned(isScan ? forScan() : this); } - public static class Standard extends CompressedChunkReader + protected interface CompressedReader extends Closeable + { + default void allocateResources() + { + } + + default void deallocateResources() + { + } + + default boolean allocated() + { + return false; + } + + default void close() + { + + } + + + ByteBuffer read(CompressionMetadata.Chunk chunk, boolean shouldCheckCrc) throws CorruptBlockException; + } + + private static class RandomAccessCompressedReader implements CompressedReader + { + private final ChannelProxy channel; + private final ThreadLocalByteBufferHolder bufferHolder; + + private RandomAccessCompressedReader(ChannelProxy channel, CompressionMetadata metadata) + { + this.channel = channel; + this.bufferHolder = new ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType()); + } + + @Override + public ByteBuffer read(CompressionMetadata.Chunk chunk, boolean shouldCheckCrc) throws CorruptBlockException + { + int length = shouldCheckCrc ? chunk.length + Integer.BYTES // compressed length + checksum length + : chunk.length; + ByteBuffer compressed = bufferHolder.getBuffer(length); + if (channel.read(compressed, chunk.offset) != length) + throw new CorruptBlockException(channel.filePath(), chunk); + compressed.flip(); + compressed.limit(chunk.length); + + if (shouldCheckCrc) + { + int checksum = (int) ChecksumType.CRC32.of(compressed); + compressed.limit(length); + if (compressed.getInt() != checksum) + throw new CorruptBlockException(channel.filePath(), chunk); + compressed.position(0).limit(chunk.length); + } + return compressed; + } + } + + private static class ScanCompressedReader implements CompressedReader { - // we read the raw compressed bytes into this buffer, then uncompressed them into the provided one. + private final ChannelProxy channel; private final ThreadLocalByteBufferHolder bufferHolder; + private final ThreadLocalReadAheadBuffer readAheadBuffer; + + private ScanCompressedReader(ChannelProxy channel, CompressionMetadata metadata, int readAheadBufferSize) + { + this.channel = channel; + this.bufferHolder = new ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType()); + this.readAheadBuffer = new ThreadLocalReadAheadBuffer(channel, readAheadBufferSize, metadata.compressor().preferredBufferType()); + } + + @Override + public ByteBuffer read(CompressionMetadata.Chunk chunk, boolean shouldCheckCrc) throws CorruptBlockException + { + int length = shouldCheckCrc ? chunk.length + Integer.BYTES // compressed length + checksum length + : chunk.length; + ByteBuffer compressed = bufferHolder.getBuffer(length); + + int copied = 0; + while (copied < length) { Review Comment: the last idea, please fill free to ignore it :-) we can move the whole loop into the readAheadBuffer logic and do a single thread local lookup for a block in such case (currently we do 3 or 4 lookups per loop iteration) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org