blambov commented on code in PR #2064:
URL: https://github.com/apache/cassandra/pull/2064#discussion_r1069658209


##########
src/java/org/apache/cassandra/io/sstable/IndexSummary.java:
##########
@@ -352,6 +353,37 @@ public void remove()
         };
     }
 
+    public long getScanPositionFromBinarySearch(PartitionPosition key)

Review Comment:
   `FromBinarySearch` does not make much sense here. 
`getScanPosition(PartitionPosition)` would be better.



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -1033,6 +1012,14 @@ protected void closeInternalComponent(AutoCloseable 
closeable)
         }
     }
 
+    /**
+     * This method is expected to close the components which occupy memory but 
are not needed when we just want to
+     * stream the components (for example, when SSTable is opened with 
SSTableLoader). The method should call
+     * {@link #closeInternalComponent(AutoCloseable)} for each such component. 
Leaving the implementation empty is
+     * valid, but may impact memory usage.
+     */
+    public abstract void releaseComponents();

Review Comment:
   Nit: This should include some adjective for the type of components released, 
e.g. `releaseInMemoryComponents`.



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -1890,22 +1890,17 @@ public void run()
         return executor.submitIfRunning(runnable, "cache write");
     }
 
-    public List<SSTableReader> 
runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws 
IOException
+    public <T, E extends Throwable> T runWithActiveCompactions(Holder holder, 
ThrowingSupplier<T, E> callable) throws E

Review Comment:
   I don't understand this method's naming -- it is running the callable 
wrapped as an active compaction with the given info holder?
   
   Maybe `runAsActiveCompaction(CompactionInfo.Holder activeCompactionInfo, 
ThrowingSupplier callable)`? 
   Or `<T, E extends Throwable, R extends CompactionInfo.Holder & 
ThrowingSupplier<T, E>> T runAsActiveCompaction(R callable) throws E` and use 
one argument for both operation info and callable to make it explicit that the 
info is for the operation that the callable does?



##########
src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java:
##########
@@ -82,46 +82,46 @@ public void build()
                 PerSSTableIndexWriter indexWriter = 
SASIIndex.newWriter(keyValidator, sstable.descriptor, indexes, 
OperationType.COMPACTION);
                 targetDirectory = indexWriter.getDescriptor().directory.path();
 
-                long previousKeyPosition = 0;
-                try (KeyIterator keys = new KeyIterator(sstable.descriptor, 
cfs.metadata()))
+                try (KeyReader keys = sstable.keyReader())
                 {
-                    while (keys.hasNext())
+                    while (!keys.isExhausted())
                     {
                         if (isStopRequested())
                             throw new 
CompactionInterruptedException(getCompactionInfo());
 
-                        final DecoratedKey key = keys.next();
-                        final long keyPosition = keys.getKeyPosition();
+                        final DecoratedKey key = 
sstable.decorateKey(keys.key());
+                        final long keyPosition = 
keys.keyPositionForSecondaryIndex();
 
-                        indexWriter.startPartition(key, keyPosition);
+                        indexWriter.startPartition(key, keys.dataPosition(), 
keyPosition);
 
-                        try
-                        {
-                            long position = sstable.getPosition(key, 
SSTableReader.Operator.EQ);
-                            dataFile.seek(position);
-                            ByteBufferUtil.readWithShortLength(dataFile); // 
key
+                        long position = sstable.getPosition(key, 
SSTableReader.Operator.EQ);

Review Comment:
   Isn't this `keys.dataPosition()`?



##########
src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java:
##########
@@ -56,22 +55,23 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
     private final SortedMap<SSTableReader, Map<ColumnMetadata, ColumnIndex>> 
sstables;
 
     private long bytesProcessed = 0;
-    private final long totalSizeInBytes;
+    private final long totalBytesToProcess;
 
     public SASIIndexBuilder(ColumnFamilyStore cfs, SortedMap<SSTableReader, 
Map<ColumnMetadata, ColumnIndex>> sstables)
     {
-        long totalIndexBytes = 0;
+        long totalBytesToProcess = 0;
         for (SSTableReader sstable : sstables.keySet())
-            totalIndexBytes += getPrimaryIndexLength(sstable);
+            totalBytesToProcess += sstable.uncompressedLength();

Review Comment:
   We probably need a documentation change for this (progress changed from 
index bytes to data bytes).



##########
src/java/org/apache/cassandra/io/sstable/Descriptor.java:
##########
@@ -171,6 +172,18 @@ public List<File> getTemporaryFiles()
         return ret;
     }
 
+    public Set<Component> getComponents(Set<Component> alwaysAdd, 
Set<Component> optional)

Review Comment:
   Nit: alwaysAdd -> mandatory?



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -1796,9 +1654,33 @@ public static void resetTidying()
         GlobalTidy.lookup.clear();
     }
 
-    public static abstract class Factory
+    public interface Factory<T extends SSTableReader>

Review Comment:
   Does this type parameter provide any benefit? The user shouldn't know the 
actual type and will generally keep the generic `SSTableReader` reference.



##########
src/java/org/apache/cassandra/io/sstable/format/FilterComponent.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.utils.BloomFilterSerializer;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+
+public class FilterComponent
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(FilterComponent.class);
+
+    private FilterComponent()
+    {
+    }
+
+    /**
+     * Load bloom filter from Filter.db file.
+     */
+    public static IFilter load(Descriptor descriptor) throws IOException
+    {
+        File filterFile = descriptor.fileFor(Component.FILTER);
+
+        if (!filterFile.exists())
+            return null;
+
+        if (filterFile.length() == 0)
+            return FilterFactory.AlwaysPresent;
+
+        try (FileInputStreamPlus stream = 
descriptor.fileFor(Component.FILTER).newInputStream())
+        {
+            return 
BloomFilterSerializer.forVersion(descriptor.version.hasOldBfFormat()).deserialize(stream);
+        }
+        catch (IOException ex)
+        {
+            throw new IOException("Failed to load Bloom filter for SSTable: " 
+ descriptor.baseFilename(), ex);
+        }
+    }
+
+    public static void save(IFilter filter, Descriptor descriptor) throws 
IOException
+    {
+        File filterFile = descriptor.fileFor(Component.FILTER);
+        try (FileOutputStreamPlus stream = 
filterFile.newOutputStream(File.WriteMode.OVERWRITE))
+        {
+            filter.serialize(stream, descriptor.version.hasOldBfFormat());
+            stream.flush();
+            stream.sync(); // is it needed if we close the file right after 
that?

Review Comment:
   This call maps to `FileChannel.force`, which ensures that the data is 
flushed out to disk (i.e. persistent) before returning. Closing does not do 
that (it does do the flushing, but we want that to happen before the sync).



##########
src/java/org/apache/cassandra/io/sstable/format/FilterComponent.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.utils.BloomFilterSerializer;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+
+public class FilterComponent
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(FilterComponent.class);
+
+    private FilterComponent()
+    {
+    }
+
+    /**
+     * Load bloom filter from Filter.db file.
+     */
+    public static IFilter load(Descriptor descriptor) throws IOException
+    {
+        File filterFile = descriptor.fileFor(Component.FILTER);
+
+        if (!filterFile.exists())
+            return null;
+
+        if (filterFile.length() == 0)
+            return FilterFactory.AlwaysPresent;
+
+        try (FileInputStreamPlus stream = 
descriptor.fileFor(Component.FILTER).newInputStream())
+        {
+            return 
BloomFilterSerializer.forVersion(descriptor.version.hasOldBfFormat()).deserialize(stream);
+        }
+        catch (IOException ex)
+        {
+            throw new IOException("Failed to load Bloom filter for SSTable: " 
+ descriptor.baseFilename(), ex);
+        }
+    }
+
+    public static void save(IFilter filter, Descriptor descriptor) throws 
IOException
+    {
+        File filterFile = descriptor.fileFor(Component.FILTER);
+        try (FileOutputStreamPlus stream = 
filterFile.newOutputStream(File.WriteMode.OVERWRITE))
+        {
+            filter.serialize(stream, descriptor.version.hasOldBfFormat());
+            stream.flush();
+            stream.sync(); // is it needed if we close the file right after 
that?
+        }
+        catch (IOException ex)
+        {
+            throw new IOException("Failed to save Bloom filter for SSTable: " 
+ descriptor.baseFilename(), ex);
+        }
+    }
+
+    public static void saveOrDeleteCorrupted(Descriptor descriptor, IFilter 
filter) throws IOException

Review Comment:
   The name is a little misleading, because it implies one or the other while 
we do one or both. Maybe `saveWithDeleteOnFailure` or something similar?



##########
src/java/org/apache/cassandra/utils/BloomFilterSerializer.java:
##########
@@ -17,47 +17,66 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IGenericSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.obs.IBitSet;
 import org.apache.cassandra.utils.obs.OffHeapBitSet;
 
-public final class BloomFilterSerializer
+public final class BloomFilterSerializer<I extends InputStream & 
DataInputPlus, O extends OutputStream & DataOutputPlus> implements 
IGenericSerializer<BloomFilter, I, O>

Review Comment:
   Is there any benefit from using these types rather than 
`DataInput/OutputStreamPlus`?



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java:
##########
@@ -439,12 +396,12 @@ void load(boolean recreateBloomFilter,
                   StatsMetadata statsMetadata,
                   Set<Component> components) throws IOException
         {
-            try(FileHandle.Builder ibuilder = new 
FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
-                    .mmapped(DatabaseDescriptor.getIndexAccessMode() == 
Config.DiskAccessMode.mmap)
-                    .withChunkCache(ChunkCache.instance);
-                    FileHandle.Builder dbuilder = new 
FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(components.contains(Component.COMPRESSION_INFO))
-                                                                               
                                 
.mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
-                                                                               
                                 .withChunkCache(ChunkCache.instance))
+            try (FileHandle.Builder ibuilder = new 
FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
+                                               
.mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+                                               
.withChunkCache(ChunkCache.instance);
+                 FileHandle.Builder dbuilder = new 
FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(components.contains(Component.COMPRESSION_INFO))

Review Comment:
   Nit: perhaps move to new line and align with new as above?



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java:
##########
@@ -301,11 +257,11 @@ public SSTableReader build()
 
             boolean compression = 
components.contains(Component.COMPRESSION_INFO);
             try (FileHandle.Builder ibuilder = new 
FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
-                    .mmapped(DatabaseDescriptor.getIndexAccessMode() == 
Config.DiskAccessMode.mmap)
-                    .withChunkCache(ChunkCache.instance);
-                    FileHandle.Builder dbuilder = new 
FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
-                                                                               
                                 
.mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
-                                                                               
                                 .withChunkCache(ChunkCache.instance))
+                                               
.mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+                                               
.withChunkCache(ChunkCache.instance);
+                 FileHandle.Builder dbuilder = new 
FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)

Review Comment:
   Nit: perhaps move to new line and align with `new` as above?



##########
src/java/org/apache/cassandra/io/sstable/format/big/IndexSummaryComponent.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummary;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class IndexSummaryComponent
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(IndexSummaryComponent.class);
+
+    public final IndexSummary indexSummary;
+    public final DecoratedKey first;
+    public final DecoratedKey last;
+
+    public IndexSummaryComponent(IndexSummary indexSummary, DecoratedKey 
first, DecoratedKey last)
+    {
+        this.indexSummary = indexSummary;
+        this.first = first;
+        this.last = last;
+    }
+
+    /**
+     * Load index summary, first key and last key from Summary.db file if it 
exists.
+     * <p>
+     * if loaded index summary has different index interval from current value 
stored in schema,
+     * then Summary.db file will be deleted and need to be rebuilt.
+     */
+    public static IndexSummaryComponent load(Descriptor descriptor, 
TableMetadata metadata) throws IOException
+    {
+        File summaryFile = descriptor.fileFor(Component.SUMMARY);
+        if (!summaryFile.exists())
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("SSTable Summary File {} does not exist", 
summaryFile.absolutePath());
+            return null;
+        }
+
+        IndexSummary summary = null;
+        try (FileInputStreamPlus iStream = summaryFile.newInputStream())
+        {
+            summary = IndexSummary.serializer.deserialize(iStream,
+                                                          metadata.partitioner,
+                                                          
metadata.params.minIndexInterval,
+                                                          
metadata.params.maxIndexInterval);
+            DecoratedKey first = 
metadata.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+            DecoratedKey last = 
metadata.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+
+            return new IndexSummaryComponent(summary, first, last);
+        }
+        catch (IOException ex)
+        {
+            if (summary != null)
+                summary.close();
+
+            throw new IOException(String.format("Cannot deserialize SSTable %s 
component: %s", Component.SUMMARY.name, summaryFile), ex);
+        }
+    }
+
+    public static IndexSummaryComponent loadOrDeleteCorrupted(Descriptor 
descriptor, TableMetadata metadata) throws IOException
+    {
+        try
+        {
+            return load(descriptor, metadata);
+        }
+        catch (IOException ex)
+        {
+            descriptor.fileFor(Component.SUMMARY).deleteIfExists();
+            throw ex;
+        }
+    }
+
+    /**
+     * Save index summary to Summary.db file.
+     */
+    public void save(Descriptor descriptor) throws IOException
+    {
+        File summaryFile = descriptor.fileFor(Component.SUMMARY);
+        if (summaryFile.exists())
+            summaryFile.delete();
+
+        try (DataOutputStreamPlus oStream = 
summaryFile.newOutputStream(File.WriteMode.OVERWRITE))
+        {
+            IndexSummary.serializer.serialize(indexSummary, oStream);
+            ByteBufferUtil.writeWithLength(first.getKey(), oStream);
+            ByteBufferUtil.writeWithLength(last.getKey(), oStream);
+        }
+        catch (IOException ex)
+        {
+            throw new IOException("Failed to save index summary for SSTable: " 
+ descriptor.baseFilename(), ex);
+        }
+    }
+
+    public void saveOrDeleteCorrupted(Descriptor descriptor) throws IOException

Review Comment:
   As for bloom filter, change to `WithDeleteIfFailed` or similar.



##########
src/java/org/apache/cassandra/io/sstable/SSTable.java:
##########
@@ -155,9 +155,9 @@ public DecoratedKey decorateKey(ByteBuffer key)
      */
     public static DecoratedKey getMinimalKey(DecoratedKey key)

Review Comment:
   Nit: could this be renamed? E.g. to `onHeapMinimized`? Or something in the 
vein of `retainable` like the clusterings?



##########
src/java/org/apache/cassandra/io/sstable/Descriptor.java:
##########
@@ -423,6 +425,19 @@ public boolean isCompatible()
         return version.isCompatible();
     }
 
+    public Set<Component> discoverComponents()
+    {
+        Set<Component.Type> knownTypes = Sets.difference(Component.TYPES, 
Collections.singleton(Component.Type.CUSTOM));
+        Set<Component> components = 
Sets.newHashSetWithExpectedSize(knownTypes.size());
+        for (Component.Type componentType : knownTypes)
+        {
+            Component component = new Component(componentType);

Review Comment:
   Any reason to not make these singletons?



##########
test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java:
##########
@@ -388,10 +387,10 @@ private void testPosition(IndexSummary original, 
IndexSummary downsampled, List<
     {
         for (DecoratedKey key : keys)
         {
-            long orig = 
SSTableReader.getIndexScanPositionFromBinarySearchResult(original.binarySearch(key),
 original);
+            long orig = 
original.getScanPositionFromBinarySearchResult(original.binarySearch(key));

Review Comment:
   This can be `original.getScanPosition(key)`.



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java:
##########
@@ -395,12 +393,13 @@ void load(boolean recreateBloomFilter,
                   StatsMetadata statsMetadata,
                   Set<Component> components) throws IOException
         {
-            try (FileHandle.Builder ibuilder = new 
FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
-                                               
.mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
-                                               
.withChunkCache(ChunkCache.instance);
-                 FileHandle.Builder dbuilder = new 
FileHandle.Builder(descriptor.filenameFor(Component.DATA)).mmapped(DatabaseDescriptor.getDiskAccessMode()
 == Config.DiskAccessMode.mmap)
-                                                                               
                              .withChunkCache(ChunkCache.instance))
+            try
             {
+                FileHandle.Builder ibuilder = new 
FileHandle.Builder(descriptor.fileFor(Component.PRIMARY_INDEX))

Review Comment:
   Is everything that can go wrong / throw an exception covered in the 
`complete` block? Can we leave mmap regions or memory (e.g. compression 
offsets) unreleased?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to