blambov commented on code in PR #2064:
URL: https://github.com/apache/cassandra/pull/2064#discussion_r1069658209
##########
src/java/org/apache/cassandra/io/sstable/IndexSummary.java:
##########
@@ -352,6 +353,37 @@ public void remove()
};
}
+ public long getScanPositionFromBinarySearch(PartitionPosition key)
Review Comment:
`FromBinarySearch` does not make much sense here.
`getScanPosition(PartitionPosition)` would be better.
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -1033,6 +1012,14 @@ protected void closeInternalComponent(AutoCloseable
closeable)
}
}
+ /**
+ * This method is expected to close the components which occupy memory but
are not needed when we just want to
+ * stream the components (for example, when SSTable is opened with
SSTableLoader). The method should call
+ * {@link #closeInternalComponent(AutoCloseable)} for each such component.
Leaving the implementation empty is
+ * valid, but may impact memory usage.
+ */
+ public abstract void releaseComponents();
Review Comment:
Nit: This should include some adjective for the type of components released,
e.g. `releaseInMemoryComponents`.
##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -1890,22 +1890,17 @@ public void run()
return executor.submitIfRunning(runnable, "cache write");
}
- public List<SSTableReader>
runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws
IOException
+ public <T, E extends Throwable> T runWithActiveCompactions(Holder holder,
ThrowingSupplier<T, E> callable) throws E
Review Comment:
I don't understand this method's naming -- it is running the callable
wrapped as an active compaction with the given info holder?
Maybe `runAsActiveCompaction(CompactionInfo.Holder activeCompactionInfo,
ThrowingSupplier callable)`?
Or `<T, E extends Throwable, R extends CompactionInfo.Holder &
ThrowingSupplier<T, E>> T runAsActiveCompaction(R callable) throws E` and use
one argument for both operation info and callable to make it explicit that the
info is for the operation that the callable does?
##########
src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java:
##########
@@ -82,46 +82,46 @@ public void build()
PerSSTableIndexWriter indexWriter =
SASIIndex.newWriter(keyValidator, sstable.descriptor, indexes,
OperationType.COMPACTION);
targetDirectory = indexWriter.getDescriptor().directory.path();
- long previousKeyPosition = 0;
- try (KeyIterator keys = new KeyIterator(sstable.descriptor,
cfs.metadata()))
+ try (KeyReader keys = sstable.keyReader())
{
- while (keys.hasNext())
+ while (!keys.isExhausted())
{
if (isStopRequested())
throw new
CompactionInterruptedException(getCompactionInfo());
- final DecoratedKey key = keys.next();
- final long keyPosition = keys.getKeyPosition();
+ final DecoratedKey key =
sstable.decorateKey(keys.key());
+ final long keyPosition =
keys.keyPositionForSecondaryIndex();
- indexWriter.startPartition(key, keyPosition);
+ indexWriter.startPartition(key, keys.dataPosition(),
keyPosition);
- try
- {
- long position = sstable.getPosition(key,
SSTableReader.Operator.EQ);
- dataFile.seek(position);
- ByteBufferUtil.readWithShortLength(dataFile); //
key
+ long position = sstable.getPosition(key,
SSTableReader.Operator.EQ);
Review Comment:
Isn't this `keys.dataPosition()`?
##########
src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java:
##########
@@ -56,22 +55,23 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
private final SortedMap<SSTableReader, Map<ColumnMetadata, ColumnIndex>>
sstables;
private long bytesProcessed = 0;
- private final long totalSizeInBytes;
+ private final long totalBytesToProcess;
public SASIIndexBuilder(ColumnFamilyStore cfs, SortedMap<SSTableReader,
Map<ColumnMetadata, ColumnIndex>> sstables)
{
- long totalIndexBytes = 0;
+ long totalBytesToProcess = 0;
for (SSTableReader sstable : sstables.keySet())
- totalIndexBytes += getPrimaryIndexLength(sstable);
+ totalBytesToProcess += sstable.uncompressedLength();
Review Comment:
We probably need a documentation change for this (progress changed from
index bytes to data bytes).
##########
src/java/org/apache/cassandra/io/sstable/Descriptor.java:
##########
@@ -171,6 +172,18 @@ public List<File> getTemporaryFiles()
return ret;
}
+ public Set<Component> getComponents(Set<Component> alwaysAdd,
Set<Component> optional)
Review Comment:
Nit: alwaysAdd -> mandatory?
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -1796,9 +1654,33 @@ public static void resetTidying()
GlobalTidy.lookup.clear();
}
- public static abstract class Factory
+ public interface Factory<T extends SSTableReader>
Review Comment:
Does this type parameter provide any benefit? The user shouldn't know the
actual type and will generally keep the generic `SSTableReader` reference.
##########
src/java/org/apache/cassandra/io/sstable/format/FilterComponent.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.utils.BloomFilterSerializer;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+
+public class FilterComponent
+{
+ private static final Logger logger =
LoggerFactory.getLogger(FilterComponent.class);
+
+ private FilterComponent()
+ {
+ }
+
+ /**
+ * Load bloom filter from Filter.db file.
+ */
+ public static IFilter load(Descriptor descriptor) throws IOException
+ {
+ File filterFile = descriptor.fileFor(Component.FILTER);
+
+ if (!filterFile.exists())
+ return null;
+
+ if (filterFile.length() == 0)
+ return FilterFactory.AlwaysPresent;
+
+ try (FileInputStreamPlus stream =
descriptor.fileFor(Component.FILTER).newInputStream())
+ {
+ return
BloomFilterSerializer.forVersion(descriptor.version.hasOldBfFormat()).deserialize(stream);
+ }
+ catch (IOException ex)
+ {
+ throw new IOException("Failed to load Bloom filter for SSTable: "
+ descriptor.baseFilename(), ex);
+ }
+ }
+
+ public static void save(IFilter filter, Descriptor descriptor) throws
IOException
+ {
+ File filterFile = descriptor.fileFor(Component.FILTER);
+ try (FileOutputStreamPlus stream =
filterFile.newOutputStream(File.WriteMode.OVERWRITE))
+ {
+ filter.serialize(stream, descriptor.version.hasOldBfFormat());
+ stream.flush();
+ stream.sync(); // is it needed if we close the file right after
that?
Review Comment:
This call maps to `FileChannel.force`, which ensures that the data is
flushed out to disk (i.e. persistent) before returning. Closing does not do
that (it does do the flushing, but we want that to happen before the sync).
##########
src/java/org/apache/cassandra/io/sstable/format/FilterComponent.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.utils.BloomFilterSerializer;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+
+public class FilterComponent
+{
+ private static final Logger logger =
LoggerFactory.getLogger(FilterComponent.class);
+
+ private FilterComponent()
+ {
+ }
+
+ /**
+ * Load bloom filter from Filter.db file.
+ */
+ public static IFilter load(Descriptor descriptor) throws IOException
+ {
+ File filterFile = descriptor.fileFor(Component.FILTER);
+
+ if (!filterFile.exists())
+ return null;
+
+ if (filterFile.length() == 0)
+ return FilterFactory.AlwaysPresent;
+
+ try (FileInputStreamPlus stream =
descriptor.fileFor(Component.FILTER).newInputStream())
+ {
+ return
BloomFilterSerializer.forVersion(descriptor.version.hasOldBfFormat()).deserialize(stream);
+ }
+ catch (IOException ex)
+ {
+ throw new IOException("Failed to load Bloom filter for SSTable: "
+ descriptor.baseFilename(), ex);
+ }
+ }
+
+ public static void save(IFilter filter, Descriptor descriptor) throws
IOException
+ {
+ File filterFile = descriptor.fileFor(Component.FILTER);
+ try (FileOutputStreamPlus stream =
filterFile.newOutputStream(File.WriteMode.OVERWRITE))
+ {
+ filter.serialize(stream, descriptor.version.hasOldBfFormat());
+ stream.flush();
+ stream.sync(); // is it needed if we close the file right after
that?
+ }
+ catch (IOException ex)
+ {
+ throw new IOException("Failed to save Bloom filter for SSTable: "
+ descriptor.baseFilename(), ex);
+ }
+ }
+
+ public static void saveOrDeleteCorrupted(Descriptor descriptor, IFilter
filter) throws IOException
Review Comment:
The name is a little misleading, because it implies one or the other while
we do one or both. Maybe `saveWithDeleteOnFailure` or something similar?
##########
src/java/org/apache/cassandra/utils/BloomFilterSerializer.java:
##########
@@ -17,47 +17,66 @@
*/
package org.apache.cassandra.utils;
-import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IGenericSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.obs.IBitSet;
import org.apache.cassandra.utils.obs.OffHeapBitSet;
-public final class BloomFilterSerializer
+public final class BloomFilterSerializer<I extends InputStream &
DataInputPlus, O extends OutputStream & DataOutputPlus> implements
IGenericSerializer<BloomFilter, I, O>
Review Comment:
Is there any benefit from using these types rather than
`DataInput/OutputStreamPlus`?
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java:
##########
@@ -439,12 +396,12 @@ void load(boolean recreateBloomFilter,
StatsMetadata statsMetadata,
Set<Component> components) throws IOException
{
- try(FileHandle.Builder ibuilder = new
FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
- .mmapped(DatabaseDescriptor.getIndexAccessMode() ==
Config.DiskAccessMode.mmap)
- .withChunkCache(ChunkCache.instance);
- FileHandle.Builder dbuilder = new
FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(components.contains(Component.COMPRESSION_INFO))
-
.mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
-
.withChunkCache(ChunkCache.instance))
+ try (FileHandle.Builder ibuilder = new
FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
+
.mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+
.withChunkCache(ChunkCache.instance);
+ FileHandle.Builder dbuilder = new
FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(components.contains(Component.COMPRESSION_INFO))
Review Comment:
Nit: perhaps move to new line and align with new as above?
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java:
##########
@@ -301,11 +257,11 @@ public SSTableReader build()
boolean compression =
components.contains(Component.COMPRESSION_INFO);
try (FileHandle.Builder ibuilder = new
FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
- .mmapped(DatabaseDescriptor.getIndexAccessMode() ==
Config.DiskAccessMode.mmap)
- .withChunkCache(ChunkCache.instance);
- FileHandle.Builder dbuilder = new
FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
-
.mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
-
.withChunkCache(ChunkCache.instance))
+
.mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+
.withChunkCache(ChunkCache.instance);
+ FileHandle.Builder dbuilder = new
FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
Review Comment:
Nit: perhaps move to new line and align with `new` as above?
##########
src/java/org/apache/cassandra/io/sstable/format/big/IndexSummaryComponent.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummary;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class IndexSummaryComponent
+{
+ private static final Logger logger =
LoggerFactory.getLogger(IndexSummaryComponent.class);
+
+ public final IndexSummary indexSummary;
+ public final DecoratedKey first;
+ public final DecoratedKey last;
+
+ public IndexSummaryComponent(IndexSummary indexSummary, DecoratedKey
first, DecoratedKey last)
+ {
+ this.indexSummary = indexSummary;
+ this.first = first;
+ this.last = last;
+ }
+
+ /**
+ * Load index summary, first key and last key from Summary.db file if it
exists.
+ * <p>
+ * if loaded index summary has different index interval from current value
stored in schema,
+ * then Summary.db file will be deleted and need to be rebuilt.
+ */
+ public static IndexSummaryComponent load(Descriptor descriptor,
TableMetadata metadata) throws IOException
+ {
+ File summaryFile = descriptor.fileFor(Component.SUMMARY);
+ if (!summaryFile.exists())
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("SSTable Summary File {} does not exist",
summaryFile.absolutePath());
+ return null;
+ }
+
+ IndexSummary summary = null;
+ try (FileInputStreamPlus iStream = summaryFile.newInputStream())
+ {
+ summary = IndexSummary.serializer.deserialize(iStream,
+ metadata.partitioner,
+
metadata.params.minIndexInterval,
+
metadata.params.maxIndexInterval);
+ DecoratedKey first =
metadata.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ DecoratedKey last =
metadata.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+
+ return new IndexSummaryComponent(summary, first, last);
+ }
+ catch (IOException ex)
+ {
+ if (summary != null)
+ summary.close();
+
+ throw new IOException(String.format("Cannot deserialize SSTable %s
component: %s", Component.SUMMARY.name, summaryFile), ex);
+ }
+ }
+
+ public static IndexSummaryComponent loadOrDeleteCorrupted(Descriptor
descriptor, TableMetadata metadata) throws IOException
+ {
+ try
+ {
+ return load(descriptor, metadata);
+ }
+ catch (IOException ex)
+ {
+ descriptor.fileFor(Component.SUMMARY).deleteIfExists();
+ throw ex;
+ }
+ }
+
+ /**
+ * Save index summary to Summary.db file.
+ */
+ public void save(Descriptor descriptor) throws IOException
+ {
+ File summaryFile = descriptor.fileFor(Component.SUMMARY);
+ if (summaryFile.exists())
+ summaryFile.delete();
+
+ try (DataOutputStreamPlus oStream =
summaryFile.newOutputStream(File.WriteMode.OVERWRITE))
+ {
+ IndexSummary.serializer.serialize(indexSummary, oStream);
+ ByteBufferUtil.writeWithLength(first.getKey(), oStream);
+ ByteBufferUtil.writeWithLength(last.getKey(), oStream);
+ }
+ catch (IOException ex)
+ {
+ throw new IOException("Failed to save index summary for SSTable: "
+ descriptor.baseFilename(), ex);
+ }
+ }
+
+ public void saveOrDeleteCorrupted(Descriptor descriptor) throws IOException
Review Comment:
As for bloom filter, change to `WithDeleteIfFailed` or similar.
##########
src/java/org/apache/cassandra/io/sstable/SSTable.java:
##########
@@ -155,9 +155,9 @@ public DecoratedKey decorateKey(ByteBuffer key)
*/
public static DecoratedKey getMinimalKey(DecoratedKey key)
Review Comment:
Nit: could this be renamed? E.g. to `onHeapMinimized`? Or something in the
vein of `retainable` like the clusterings?
##########
src/java/org/apache/cassandra/io/sstable/Descriptor.java:
##########
@@ -423,6 +425,19 @@ public boolean isCompatible()
return version.isCompatible();
}
+ public Set<Component> discoverComponents()
+ {
+ Set<Component.Type> knownTypes = Sets.difference(Component.TYPES,
Collections.singleton(Component.Type.CUSTOM));
+ Set<Component> components =
Sets.newHashSetWithExpectedSize(knownTypes.size());
+ for (Component.Type componentType : knownTypes)
+ {
+ Component component = new Component(componentType);
Review Comment:
Any reason to not make these singletons?
##########
test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java:
##########
@@ -388,10 +387,10 @@ private void testPosition(IndexSummary original,
IndexSummary downsampled, List<
{
for (DecoratedKey key : keys)
{
- long orig =
SSTableReader.getIndexScanPositionFromBinarySearchResult(original.binarySearch(key),
original);
+ long orig =
original.getScanPositionFromBinarySearchResult(original.binarySearch(key));
Review Comment:
This can be `original.getScanPosition(key)`.
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java:
##########
@@ -395,12 +393,13 @@ void load(boolean recreateBloomFilter,
StatsMetadata statsMetadata,
Set<Component> components) throws IOException
{
- try (FileHandle.Builder ibuilder = new
FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
-
.mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
-
.withChunkCache(ChunkCache.instance);
- FileHandle.Builder dbuilder = new
FileHandle.Builder(descriptor.filenameFor(Component.DATA)).mmapped(DatabaseDescriptor.getDiskAccessMode()
== Config.DiskAccessMode.mmap)
-
.withChunkCache(ChunkCache.instance))
+ try
{
+ FileHandle.Builder ibuilder = new
FileHandle.Builder(descriptor.fileFor(Component.PRIMARY_INDEX))
Review Comment:
Is everything that can go wrong / throw an exception covered in the
`complete` block? Can we leave mmap regions or memory (e.g. compression
offsets) unreleased?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]