aweisberg commented on code in PR #4178:
URL: https://github.com/apache/cassandra/pull/4178#discussion_r2159507957


##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -3242,6 +3283,11 @@ public static void initializeCommitLogDiskAccessMode()
         commitLogWriteDiskAccessMode = accessModeDirectIoPair.left;
     }
 
+    public static void initializeCompactionScanDiskAccessMode()
+    {
+        compactionScanDiskAccessMode = 
resolveCompactionScanDiskAccessMode(conf.disk_access_mode, 
conf.compaction_scan_disk_access_mode);

Review Comment:
   Inline `resolveCompactionScanDiskAccessMode` here?



##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java:
##########
@@ -581,4 +583,10 @@ public boolean supportsEarlyOpen()
     {
         return true;
     }
+
+    protected static DiskAccessMode compactionScanDiskAccessMode()

Review Comment:
   Does this need to wrap `DatabaseDescriptor`? Just have everything static 
import and call `getCompactionScanDiskAccessMode`.



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -1402,7 +1418,27 @@ public RandomAccessReader openDataReader()
 
     public RandomAccessReader openDataReaderForScan()
     {
-        return dfile.createReaderForScan();
+        return openDataReaderForScan(dfile.diskAccessMode());
+    }
+
+    public RandomAccessReader openDataReaderForScan(DiskAccessMode 
diskAccessMode)
+    {
+        if (diskAccessMode == dfile.diskAccessMode())
+        {
+            return dfile.createReaderForScan(OnReaderClose.RETAIN_FILE_OPEN);

Review Comment:
   Since the consequence of this and the next condition are the same maybe they 
should be the same condition? You could assign each condition to a local 
boolean and then have a single `if` on both booleans.



##########
src/java/org/apache/cassandra/io/util/RandomAccessReader.java:
##########
@@ -319,6 +319,34 @@ public void close()
         }
     }
 
+    static class RandomAccessReaderWithOwnFile extends RandomAccessReader
+    {
+
+        private final FileHandle fileHandle;
+
+        RandomAccessReaderWithOwnFile(Rebufferer rebufferer, FileHandle 
fileHandle)
+        {
+            super(rebufferer);
+            this.fileHandle = fileHandle;
+        }
+
+        @Override
+        public void close()
+        {
+            try
+            {
+                super.close();
+            }
+            finally
+            {
+                if (this.fileHandle != null)

Review Comment:
   When would we create a reader with a null file handle? Should that be a 
precondition in the constructor?



##########
src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+
+import org.agrona.BitUtil;
+import org.agrona.BufferUtil;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+
+public final class DirectThreadLocalReadAheadBuffer extends 
ThreadLocalReadAheadBuffer
+{
+
+    private final int blockSize;
+
+    private DirectThreadLocalReadAheadBuffer(ChannelProxy channel, int 
bufferSize, int blockSize)
+    {
+        super(channel, bufferSize, () -> 
BufferUtil.allocateDirectAligned(bufferSize, blockSize));
+        this.blockSize = blockSize;
+    }
+
+    @Override
+    protected void loadBlock(ByteBuffer blockBuffer, long blockPosition, int 
sizeToRead)
+    {
+        int alignedSizeToRead = BitUtil.align(sizeToRead, blockSize);

Review Comment:
   When reading is it only required to aligned the end of the read to a block? 
Does the start of the read also need to be blocked aligned?



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -472,6 +475,9 @@ protected SSTableReader(Builder<?, ?> builder, Owner owner)
         this.sstableMetadata = builder.getStatsMetadata();
         this.header = builder.getSerializationHeader();
         this.dfile = builder.getDataFile();
+        this.directIOSupported = FileUtils.isDirectIOSupported(dfile.file())

Review Comment:
   Just curious in what situations does a `FileStore` not report a block size?



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -1064,12 +1070,17 @@ public ISSTableScanner getScanner(Range<Token> range)
      * @return A Scanner over the full content of the SSTable.
      */
     public ISSTableScanner getScanner()
+    {
+        return getScanner(dfile.diskAccessMode());
+    }
+
+    public ISSTableScanner getScanner(DiskAccessMode diskAccessMode)
     {
         PartitionPositionBounds fullRange = getPositionsForFullRange();
         if (fullRange != null)
-            return new SSTableSimpleScanner(this, 
Collections.singletonList(fullRange));
+            return new SSTableSimpleScanner(this, 
Collections.singletonList(fullRange), diskAccessMode);

Review Comment:
   Can the scanner get the disk access mode from the `dfile` or `SSTableReader` 
directly instead of passing it in as a parameter since `this` is passed in?
   
   That would simplify a little.



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -1402,7 +1418,27 @@ public RandomAccessReader openDataReader()
 
     public RandomAccessReader openDataReaderForScan()
     {
-        return dfile.createReaderForScan();
+        return openDataReaderForScan(dfile.diskAccessMode());
+    }
+
+    public RandomAccessReader openDataReaderForScan(DiskAccessMode 
diskAccessMode)
+    {
+        if (diskAccessMode == dfile.diskAccessMode())
+        {
+            return dfile.createReaderForScan(OnReaderClose.RETAIN_FILE_OPEN);
+        }
+
+        if (diskAccessMode == DiskAccessMode.direct && !directIOSupported)
+        {
+            return dfile.createReaderForScan(OnReaderClose.RETAIN_FILE_OPEN);
+        }
+
+        //noinspection resource - The FileHandle lifecycle is managed by the 
returned RandomAccessReader
+        FileHandle dataFile = dfile.toBuilder()

Review Comment:
   Exception safety leaking a file handle? It looks like we are just not 
exception safe in general around opening files looking at the callers of 
`FileHandle.complete`.



##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -1695,6 +1699,31 @@ public static void applyPartitioner(Config conf)
         partitionerName = partitioner.getClass().getCanonicalName();
     }
 
+    private static DiskAccessMode 
resolveCompactionScanDiskAccessMode(DiskAccessMode defaultDiskAccessMode,
+                                                                      
DiskAccessMode compactionScanDiskAccessMode)
+    {
+        if (DiskAccessMode.auto == compactionScanDiskAccessMode)
+        {
+            return defaultDiskAccessMode;
+        }
+        else if (DiskAccessMode.direct == compactionScanDiskAccessMode)
+        {
+            if (conf.disk_optimization_strategy == 
Config.DiskOptimizationStrategy.ssd)

Review Comment:
   Should this be a limitation we enforce? We support read ahead. Maybe the 
user should be the one to decide if they want to have enough read ahead to 
support spinning disk with direct IO instead of having the kernel manage read 
ahead using OS settings?



##########
test/unit/org/apache/cassandra/io/util/StandardCompressedChunkReaderTest.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import accord.utils.Gens;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.CompressionParams;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+
+public class StandardCompressedChunkReaderTest extends 
CompressedChunkReaderTest
+{
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+    }
+
+    @Test
+    public void scanReaderReadsLessThanRAReader()
+    {
+        var optionGen = writerOptions();
+        var paramsGen = 
compressionParams(Gens.constant(CompressionParams.DEFAULT_CHUNK_LENGTH));
+        var lengthGen = Gens.longs().between(1, 1 << 16);
+
+        qt().withSeed(-1871070464864118891L).forAll(Gens.random(), optionGen, 
paramsGen).check((rs, option, params) -> {

Review Comment:
   Hard coded seed intentional?



##########
src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java:
##########
@@ -51,23 +53,26 @@ protected Map<String, Block> initialValue()
     private final long channelSize;
 
     public ThreadLocalReadAheadBuffer(ChannelProxy channel, int bufferSize, 
BufferType bufferType)
+    {
+        this(channel, bufferSize, () -> bufferType.allocate(bufferSize));
+    }
+
+    ThreadLocalReadAheadBuffer(ChannelProxy channel, int bufferSize, 
Supplier<ByteBuffer> bufferSupplier)
     {
         this.channel = channel;
         this.channelSize = channel.size();
         this.bufferSize = bufferSize;
-        this.bufferType = bufferType;
+        this.bufferSupplier = bufferSupplier;
     }
 
     public boolean hasBuffer()
     {
         return block().buffer != null;
     }
 
-    /**
-     * Safe to call only if {@link #hasBuffer()} is true
-     */
     public int remaining()
     {
+        assert hasBuffer();

Review Comment:
   This could be a `checkState`. I think we are at the point mostly where we 
don't use `assert` so often because no one turns them off anyways. If it's 
going to be an `assert` it should have a message.



##########
src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java:
##########
@@ -51,23 +53,26 @@ protected Map<String, Block> initialValue()
     private final long channelSize;
 
     public ThreadLocalReadAheadBuffer(ChannelProxy channel, int bufferSize, 
BufferType bufferType)

Review Comment:
   Consider removing buffer size and then just fetch it from the buffer?



##########
test/unit/org/apache/cassandra/io/util/CompressedChunkReaderTest.java:
##########
@@ -20,122 +20,50 @@
 
 import accord.utils.Gen;
 import accord.utils.Gens;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ClusteringComparator;
-import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.filesystem.ListenableFileSystem;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.schema.CompressionParams;
-import org.assertj.core.api.Assertions;
 
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static accord.utils.Property.qt;
-
-public class CompressedChunkReaderTest
+public abstract class CompressedChunkReaderTest
 {
-    static
-    {
-        DatabaseDescriptor.clientInitialization();
-    }
 
-    @Test
-    public void scanReaderReadsLessThanRAReader()
+    static Gen<SequentialWriterOption> writerOptions()
     {
-        var optionGen = options();
-        var paramsGen = params();
-        var lengthGen = Gens.longs().between(1, 1 << 16);
-        qt().withSeed(-1871070464864118891L).forAll(Gens.random(), optionGen, 
paramsGen).check((rs, option, params) -> {
-            ListenableFileSystem fs = 
FileSystems.newGlobalInMemoryFileSystem();
-
-            File f = new File("/file.db");
-            AtomicInteger reads = new AtomicInteger();
-            fs.onPostRead(f.path::equals, (p, c, pos, dst, r) -> {
-                reads.incrementAndGet();
-            });
-            long length = lengthGen.nextLong(rs);
-            CompressionMetadata metadata1, metadata2;
-            try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(f, new File("/file.offset"), new 
File("/file.digest"), option, params, new MetadataCollector(new 
ClusteringComparator())))
-            {
-                for (long i = 0; i < length; i++)
-                    writer.writeLong(i);
-
-                writer.sync();
-                metadata1 = writer.open(0);
-                metadata2 = writer.open(0);
-            }
-
-            doReads(f, metadata1, length, true);
-            int scanReads = reads.getAndSet(0);
-
-            doReads(f, metadata2, length, false);
-            int raReads = reads.getAndSet(0);
-            
-            if (Files.size(f.toPath()) > 
DatabaseDescriptor.getCompressedReadAheadBufferSize())
-                Assert.assertTrue(scanReads < raReads);
-        });
+        Gen<Integer> bufferSizes = Gens.constant(1 << 10);
+        return rs -> writerOption(bufferSizes.next(rs));
     }
 
-    private void doReads(File f, CompressionMetadata metadata, long length, 
boolean useReadAhead)
+    static SequentialWriterOption writerOption(int bufferSize)
     {
-        ByteBuffer buffer = ByteBuffer.allocateDirect(metadata.chunkLength());
-
-        try (ChannelProxy channel = new ChannelProxy(f);
-             CompressedChunkReader reader = new 
CompressedChunkReader.Standard(channel, metadata, () -> 1.1);
-             metadata)
-        {
-            if (useReadAhead)
-                reader.forScan();
-
-            long offset = 0;
-            long maxOffset = length * Long.BYTES;
-            do
-            {
-                reader.readChunk(offset, buffer);
-                for (long expected = offset / Long.BYTES; 
buffer.hasRemaining(); expected++)
-                    
Assertions.assertThat(buffer.getLong()).isEqualTo(expected);
-
-                offset += metadata.chunkLength();
-            }
-            while (offset < maxOffset);
-        }
-        finally
-        {
-            FileUtils.clean(buffer);
-        }}
+        return SequentialWriterOption.newBuilder()
+                                     .finishOnClose(false)
+                                     .bufferSize(bufferSize)
+                                     .build();
+    }
 
-    private static Gen<SequentialWriterOption> options()
+    enum CompressionKind
     {
-        Gen<Integer> bufferSizes = Gens.constant(1 << 10); //.pickInt(1 << 4, 
1 << 10, 1 << 15);
-        return rs -> SequentialWriterOption.newBuilder()
-                                           .finishOnClose(false)
-                                           .bufferSize(bufferSizes.next(rs))
-                                           .build();
+        Noop, Snappy, Deflate, Lz4, Zstd
     }
 
-    private enum CompressionKind { Noop, Snappy, Deflate, Lz4, Zstd }
-
-    private static Gen<CompressionParams> params()
+    static Gen<CompressionParams> compressionParams(Gen<Integer> chunkLengths)
     {
-        Gen<Integer> chunkLengths = 
Gens.constant(CompressionParams.DEFAULT_CHUNK_LENGTH);
         Gen<Double> compressionRatio = Gens.pick(1.1D);
         return rs -> {
             CompressionKind kind = rs.pick(CompressionKind.values());
             switch (kind)
             {
-                case Noop: return CompressionParams.noop();
-                case Snappy: return 
CompressionParams.snappy(chunkLengths.next(rs), compressionRatio.next(rs));
-                case Deflate: return 
CompressionParams.deflate(chunkLengths.next(rs));
-                case Lz4: return CompressionParams.lz4(chunkLengths.next(rs));
-                case Zstd: return 
CompressionParams.zstd(chunkLengths.next(rs));
-                default: throw new UnsupportedOperationException(kind.name());
+                case Noop:

Review Comment:
   Try and avoid style churn



##########
src/java/org/apache/cassandra/io/util/CompressedChunkReader.java:
##########
@@ -119,6 +119,52 @@ default void close()
         ByteBuffer read(CompressionMetadata.Chunk chunk, boolean 
shouldCheckCrc) throws CorruptBlockException;
     }
 
+    private static final class DirectRandomAccessReader implements 
CompressedReader
+    {
+
+        private final ChannelProxy channel;
+        private final int blockSize;
+        private final DirectThreadLocalByteBufferHolder bufferHolder;

Review Comment:
   We need to think through carefully the memory management here since we 
allocate these readers for each scan and then discard them immediately. We want 
the memory dedicated to this thread local to be returned promptly once the file 
handle is closed.



##########
src/java/org/apache/cassandra/io/util/FileHandle.java:
##########
@@ -322,13 +392,14 @@ public Builder withCrcCheckChance(Supplier<Double> 
crcCheckChanceSupplier)
          */
         public Builder mmapped(boolean mmapped)
         {
-            this.mmapped = mmapped;
+            if (mmapped)
+                withDiskAccessMode(DiskAccessMode.mmap);

Review Comment:
   Should this have an `else` for the false condition? Otherwise remove the 
parameter and have it always switch to mmaped.



##########
test/unit/org/apache/cassandra/io/util/CompressedChunkReaderTest.java:
##########
@@ -20,122 +20,50 @@
 
 import accord.utils.Gen;
 import accord.utils.Gens;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ClusteringComparator;
-import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.filesystem.ListenableFileSystem;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.schema.CompressionParams;
-import org.assertj.core.api.Assertions;
 
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static accord.utils.Property.qt;
-
-public class CompressedChunkReaderTest
+public abstract class CompressedChunkReaderTest
 {
-    static
-    {
-        DatabaseDescriptor.clientInitialization();
-    }
 
-    @Test
-    public void scanReaderReadsLessThanRAReader()
+    static Gen<SequentialWriterOption> writerOptions()
     {
-        var optionGen = options();
-        var paramsGen = params();
-        var lengthGen = Gens.longs().between(1, 1 << 16);
-        qt().withSeed(-1871070464864118891L).forAll(Gens.random(), optionGen, 
paramsGen).check((rs, option, params) -> {
-            ListenableFileSystem fs = 
FileSystems.newGlobalInMemoryFileSystem();
-
-            File f = new File("/file.db");
-            AtomicInteger reads = new AtomicInteger();
-            fs.onPostRead(f.path::equals, (p, c, pos, dst, r) -> {
-                reads.incrementAndGet();
-            });
-            long length = lengthGen.nextLong(rs);
-            CompressionMetadata metadata1, metadata2;
-            try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(f, new File("/file.offset"), new 
File("/file.digest"), option, params, new MetadataCollector(new 
ClusteringComparator())))
-            {
-                for (long i = 0; i < length; i++)
-                    writer.writeLong(i);
-
-                writer.sync();
-                metadata1 = writer.open(0);
-                metadata2 = writer.open(0);
-            }
-
-            doReads(f, metadata1, length, true);
-            int scanReads = reads.getAndSet(0);
-
-            doReads(f, metadata2, length, false);
-            int raReads = reads.getAndSet(0);
-            
-            if (Files.size(f.toPath()) > 
DatabaseDescriptor.getCompressedReadAheadBufferSize())
-                Assert.assertTrue(scanReads < raReads);
-        });
+        Gen<Integer> bufferSizes = Gens.constant(1 << 10);
+        return rs -> writerOption(bufferSizes.next(rs));
     }
 
-    private void doReads(File f, CompressionMetadata metadata, long length, 
boolean useReadAhead)
+    static SequentialWriterOption writerOption(int bufferSize)
     {
-        ByteBuffer buffer = ByteBuffer.allocateDirect(metadata.chunkLength());
-
-        try (ChannelProxy channel = new ChannelProxy(f);
-             CompressedChunkReader reader = new 
CompressedChunkReader.Standard(channel, metadata, () -> 1.1);
-             metadata)
-        {
-            if (useReadAhead)
-                reader.forScan();
-
-            long offset = 0;
-            long maxOffset = length * Long.BYTES;
-            do
-            {
-                reader.readChunk(offset, buffer);
-                for (long expected = offset / Long.BYTES; 
buffer.hasRemaining(); expected++)
-                    
Assertions.assertThat(buffer.getLong()).isEqualTo(expected);
-
-                offset += metadata.chunkLength();
-            }
-            while (offset < maxOffset);
-        }
-        finally
-        {
-            FileUtils.clean(buffer);
-        }}
+        return SequentialWriterOption.newBuilder()
+                                     .finishOnClose(false)
+                                     .bufferSize(bufferSize)
+                                     .build();
+    }
 
-    private static Gen<SequentialWriterOption> options()
+    enum CompressionKind
     {
-        Gen<Integer> bufferSizes = Gens.constant(1 << 10); //.pickInt(1 << 4, 
1 << 10, 1 << 15);
-        return rs -> SequentialWriterOption.newBuilder()
-                                           .finishOnClose(false)
-                                           .bufferSize(bufferSizes.next(rs))
-                                           .build();
+        Noop, Snappy, Deflate, Lz4, Zstd

Review Comment:
   Style churn



##########
src/java/org/apache/cassandra/io/util/FileHandle.java:
##########
@@ -62,17 +70,41 @@ public class FileHandle extends SharedCloseableImpl
      */
     private final Optional<CompressionMetadata> compressionMetadata;
 
+    private final DiskAccessMode diskAccessMode;
+
+    // Properties to support unbuilding via toBuilder
+    private final ChunkCache chunkCache;

Review Comment:
   I don't love adding all this to support unbuild. Maybe just keep the builder 
and allow `complete` to be called multiple times? That is not thread safe 
thought.
   
   I don't see a much better way to do it at the moment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to