jacek-lewandowski commented on code in PR #3168:
URL: https://github.com/apache/cassandra/pull/3168#discussion_r1553329384
##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -2666,11 +2670,21 @@ public static void
setFlushCompression(Config.FlushCompression compression)
conf.flush_compression = compression;
}
- /**
- * Maximum number of buffers in the compression pool. The default value is
3, it should not be set lower than that
- * (one segment in compression, one written to, one in reserve); delays in
compression may cause the log to use
- * more, depending on how soon the sync policy stops all writing threads.
- */
+ public static ParameterizedClass getSSTableCompression()
+ {
+ return sstableCompression;
+ }
+
+ public static void setSSTableCompression(ParameterizedClass compressor)
+ {
+ conf.sstable.default_compression = compressor;
Review Comment:
this seems to have no effect for the subsequent uses of
`getSSTableCompression`
##########
test/conf/cassandra-with-sstable-compressor.yaml:
##########
@@ -0,0 +1,111 @@
+
Review Comment:
we already have a yaml for compression
##########
conf/cassandra.yaml:
##########
@@ -1113,6 +1113,46 @@ snapshot_links_per_second: 0
#sstable:
# selected_format: big
+# Defines the default compression used on tables when none is specified
+# in the CQL command.
+#
+# The class_name is the compressor class name. It may be one of the aliases,
+# the class name of a system ICompressor implementation, or fully qualified
+# name of a ICompressor class.
+#
+# class aliases are:
+# Alias System compressor impl.
+# deflate DeflateCompressor
+# lz4 LZ4Compressor
+# none (null) -- compresson disabled
+# noop NoopCompressor
Review Comment:
when a user would want to use noop over none?
##########
conf/cassandra.yaml:
##########
@@ -1113,6 +1113,46 @@ snapshot_links_per_second: 0
#sstable:
# selected_format: big
+# Defines the default compression used on tables when none is specified
+# in the CQL command.
+#
+# The class_name is the compressor class name. It may be one of the aliases,
+# the class name of a system ICompressor implementation, or fully qualified
+# name of a ICompressor class.
+#
+# class aliases are:
+# Alias System compressor impl.
+# deflate DeflateCompressor
+# lz4 LZ4Compressor
+# none (null) -- compresson disabled
+# noop NoopCompressor
+# snappy SnappyCompressor
+# zstd ZstdCompressor
+#
+# The standard parameters are any required or optional parameter for the
instantiation of the
+# specified class, or one of the following standard parameters:
+# Parameter Usage
+# enabled Disables compression if set to false. Defaults to true.
Review Comment:
what does it mean the compressor is enabled or not?
##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -2666,11 +2670,21 @@ public static void
setFlushCompression(Config.FlushCompression compression)
conf.flush_compression = compression;
}
- /**
- * Maximum number of buffers in the compression pool. The default value is
3, it should not be set lower than that
- * (one segment in compression, one written to, one in reserve); delays in
compression may cause the log to use
- * more, depending on how soon the sync policy stops all writing threads.
- */
+ public static ParameterizedClass getSSTableCompression()
Review Comment:
those methods should be named `(get|set)DefaultSSTableCompression`, right?
##########
src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java:
##########
@@ -557,14 +557,14 @@ public void validate(ClientState state)
MemtableParams.get(attrs.getString(TableParams.Option.MEMTABLE.toString()));
Guardrails.tableProperties.guard(attrs.updatedProperties(),
attrs::removeProperty, state);
- validateDefaultTimeToLive(attrs.asNewTableParams());
+ validateDefaultTimeToLive(attrs.asNewTableParams(keyspaceName));
Review Comment:
Actually I'm not sure why we need to use this - is it just to distinguish
whether we should use compression or not for the system tables? why don't you
just make the system table explicitly use no compression?
##########
src/java/org/apache/cassandra/schema/CompressionParams.java:
##########
@@ -41,163 +47,316 @@
import org.apache.cassandra.net.MessagingService;
import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT;
public final class CompressionParams
{
- public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16;
+ public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; // in KB
public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0; //
Since pre-4.0 versions do not understand the
// new
compression parameter we can't use a
//
different default value.
public static final IVersionedSerializer<CompressionParams> serializer =
new Serializer();
public static final String CLASS = "class";
public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb";
+ /** Requires a DataStorageSpec suffix */
+ public static final String CHUNK_LENGTH = "chunk_length";
+ /** Requires a DataStorageSpec suffix */
+ public static final String MAX_COMPRESSED_LENGTH = "max_compressed_length";
public static final String ENABLED = "enabled";
public static final String MIN_COMPRESS_RATIO = "min_compress_ratio";
- public static final CompressionParams DEFAULT =
!CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean()
- ? noCompression()
- : new
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
-
DEFAULT_CHUNK_LENGTH,
-
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
-
DEFAULT_MIN_COMPRESS_RATIO,
-
Collections.emptyMap());
-
- public static final CompressionParams NOOP = new
CompressionParams(NoopCompressor.create(Collections.emptyMap()),
+ /** A compressor that does nothing */
+ public static final CompressionParams NOOP = new
CompressionParams(NoopCompressor.create(emptyMap()),
// 4
KiB is often the underlying disk block size
1024 *
4,
Integer.MAX_VALUE,
DEFAULT_MIN_COMPRESS_RATIO,
-
Collections.emptyMap());
+
emptyMap());
+
+ /** (legacy) Default for when no other compression is specified.
+ * You probalby want defaultParams(keyspace) rather than this. */
+ public static final CompressionParams DEFAULT = new
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
+
DEFAULT_CHUNK_LENGTH,
+
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
+
DEFAULT_MIN_COMPRESS_RATIO,
+
emptyMap());
+
+ /** A guaranteed FAST compressor */
+ public static final CompressionParams FAST = DEFAULT;
+
+ /** The default calculated fromthe config.yaml */
+ private static CompressionParams CALCULATED_DEFAULT;
+
+ /** error message format for when 'chunk_length' and 'chunklenth_in_kb"
are both specified */
+ @VisibleForTesting
+ public static final String TOO_MANY_CHUNK_LENGTH = format("Only one of
'%s' or '%s' may be specified", CHUNK_LENGTH, CHUNK_LENGTH_IN_KB);
private final ICompressor sstableCompressor;
+ /** The chunk length in KB */
private final int chunkLength;
- private final int maxCompressedLength; // In content we store max length
to avoid rounding errors causing compress/decompress mismatch.
- private final double minCompressRatio; // In configuration we store min
ratio, the input parameter.
- private final ImmutableMap<String, String> otherOptions; // Unrecognized
options, can be used by the compressor
+ /**
+ * The compressed length in KB.
+ * In content we store max length to avoid rounding errors causing
compress/decompress mismatch.
+ */
+ private final int maxCompressedLength;
+ /**
+ * The minimum compression ratio.
+ * In configuration we store min ratio, the input parameter.
+ * Ths is mathematically related to chunkLength and maxCompressedLength in
that
+ * # chunk_length / max_compressed_length = min_compress_ratio
+ */
+ private final double minCompressRatio;
+ /** unrecognized options, may contain comressor specific options */
+ private final ImmutableMap<String, String> otherOptions;
- public static CompressionParams fromMap(Map<String, String> opts)
+ public enum CompressorType
{
- Map<String, String> options = copyOptions(opts);
-
- String sstableCompressionClass;
+ lz4(LZ4Compressor.class.getName(), LZ4Compressor::create),
+ noop(NoopCompressor.class.getName(), NoopCompressor::create),
+ snappy(SnappyCompressor.class.getName(), SnappyCompressor::create),
+ deflate(DeflateCompressor.class.getName(), DeflateCompressor::create),
+ zstd(ZstdCompressor.class.getName(), ZstdCompressor::create),
+ none(null, (opt) -> null);
- if (!opts.isEmpty() && isEnabled(opts) && !options.containsKey(CLASS))
- throw new ConfigurationException(format("Missing sub-option '%s'
for the 'compression' option.", CLASS));
+ final String className;
+ final Function<Map<String,String>,ICompressor> creator;
- if (!removeEnabled(options) && !options.isEmpty())
- throw new ConfigurationException(format("If the '%s' option is set
to false no other options must be specified", ENABLED));
- else
- sstableCompressionClass = removeSSTableCompressionClass(options);
-
- int chunkLength = removeChunkLength(options);
- double minCompressRatio = removeMinCompressRatio(options);
+ CompressorType(String className,
Function<Map<String,String>,ICompressor> creator)
+ {
+ this.className = className;
+ this.creator = creator;
+ }
- CompressionParams cp = new CompressionParams(sstableCompressionClass,
options, chunkLength, minCompressRatio);
- cp.validate();
+ static CompressorType fromName(String name)
+ {
+ if (name == null)
+ return null;
- return cp;
- }
+ for (CompressorType type : CompressorType.values())
+ {
+ if (type.name().equals(name) || Objects.equal(type.className,
name))
+ return type;
+ if (type.className != null && (type.className.equals(name) ||
type.className.endsWith("."+name)))
+ return type;
+ }
+ return null;
+ }
- public Class<? extends ICompressor> klass()
- {
- return sstableCompressor.getClass();
+ public ICompressor create(Map<String,String> options) {
+ return creator.apply(options);
+ }
}
- public static CompressionParams noCompression()
+ /**
+ * Gets the default compression params for the keyspace.
+ * This method accounts for issues when the compression params change for
system keyspaces.
+ * @param keyspace the name of the keyspace to get params for. (may be
null for non-System keyspaces)
+ * @return The compresson parameters for the keyspace.
+ */
+ public static CompressionParams defaultParams(String keyspace)
{
- return new CompressionParams(null, DEFAULT_CHUNK_LENGTH,
Integer.MAX_VALUE, 0.0, Collections.emptyMap());
- }
+ if (keyspace != null &&
SchemaConstants.getLocalAndReplicatedSystemKeyspaceNames().contains(keyspace))
+ return DEFAULT;
- // The shorthand methods below are only used for tests. They are a little
inconsistent in their choice of
- // parameters -- this is done on purpose to test out various compression
parameter combinations.
+ CompressionParams result = CALCULATED_DEFAULT;
+ if (result == null)
+ result = CALCULATED_DEFAULT =
fromParameterizedClass(DatabaseDescriptor.getSSTableCompression());
- @VisibleForTesting
- public static CompressionParams snappy()
- {
- return snappy(DEFAULT_CHUNK_LENGTH);
+ return result;
}
- @VisibleForTesting
- public static CompressionParams snappy(int chunkLength)
+ public static CompressionParams fromParameterizedClass(ParameterizedClass
options)
{
- return snappy(chunkLength, 1.1);
- }
+ if (options == null)
+ return DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean() ?
DEFAULT : noCompression();
- @VisibleForTesting
- public static CompressionParams snappy(int chunkLength, double
minCompressRatio)
- {
- return new CompressionParams(SnappyCompressor.instance, chunkLength,
calcMaxCompressedLength(chunkLength, minCompressRatio), minCompressRatio,
Collections.emptyMap());
+ return fromClassAndOptions(options.class_name, options.parameters ==
null ? emptyMap() : copyOptions(options.parameters));
}
- @VisibleForTesting
- public static CompressionParams deflate()
+ public static CompressionParams fromMap(Map<String, String> opts)
{
- return deflate(DEFAULT_CHUNK_LENGTH);
- }
+ Map<String, String> options = copyOptions(opts);
- @VisibleForTesting
- public static CompressionParams deflate(int chunkLength)
- {
- return new CompressionParams(DeflateCompressor.instance, chunkLength,
Integer.MAX_VALUE, 0.0, Collections.emptyMap());
- }
+ String sstableCompressionClass =
removeSSTableCompressionClass(options);
- @VisibleForTesting
- public static CompressionParams lz4()
- {
- return lz4(DEFAULT_CHUNK_LENGTH);
+ return fromClassAndOptions(sstableCompressionClass, options);
}
- @VisibleForTesting
- public static CompressionParams lz4(int chunkLength)
+ public static CompressionParams noCompression()
{
- return lz4(chunkLength, chunkLength);
+ return new CompressionParams(null, DEFAULT_CHUNK_LENGTH,
Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, emptyMap());
}
- @VisibleForTesting
- public static CompressionParams lz4(int chunkLength, int
maxCompressedLength)
+ static int calcMaxCompressedLength(int chunkLength, double
minCompressRatio)
{
- return new
CompressionParams(LZ4Compressor.create(Collections.emptyMap()), chunkLength,
maxCompressedLength, calcMinCompressRatio(chunkLength, maxCompressedLength),
Collections.emptyMap());
+ return (int) Math.ceil(Math.min(chunkLength / minCompressRatio,
Integer.MAX_VALUE));
}
- public static CompressionParams zstd()
+ static double calcMinCompressRatio(int chunkLength, int
maxCompressedLength)
{
- return zstd(DEFAULT_CHUNK_LENGTH);
+ if (maxCompressedLength == Integer.MAX_VALUE)
+ return 0;
+ return chunkLength * 1.0 / maxCompressedLength;
}
- public static CompressionParams zstd(Integer chunkLength)
+ private static CompressionParams fromClassAndOptions(String
sstableCompressionClass, Map<String,String> options)
{
- ZstdCompressor compressor =
ZstdCompressor.create(Collections.emptyMap());
- return new CompressionParams(compressor, chunkLength,
Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap());
+ final boolean enabled = removeEnabled(options);
+
+ if (options.containsKey(CHUNK_LENGTH_IN_KB) &&
options.containsKey(CHUNK_LENGTH))
+ throw new ConfigurationException(format(TOO_MANY_CHUNK_LENGTH,
+ CHUNK_LENGTH_IN_KB,
CHUNK_LENGTH));
+
+ final int chunk_length_in_bytes = removeChunkLength(options);
+
+ // figure out how we calculate the max_compressed_length and
min_compress_ratio
+ if (options.containsKey(MIN_COMPRESS_RATIO) &&
options.containsKey(MAX_COMPRESSED_LENGTH))
+ throw new ConfigurationException(format("Can not specify both '%s'
and '%s' for the compressor parameters.",
+ MIN_COMPRESS_RATIO,
MAX_COMPRESSED_LENGTH));
+
+ // calculate the max_compressed_length and min_compress_ratio
+ int max_compressed_length_in_bytes;
+ double min_compress_ratio;
+ String max_compressed_length_str =
options.remove(MAX_COMPRESSED_LENGTH);
+ if (!StringUtils.isBlank(max_compressed_length_str))
+ {
+ try
+ {
+ max_compressed_length_in_bytes = new
DataStorageSpec.IntKibibytesBound(max_compressed_length_str).toBytes();
+ validateMaxCompressedLength(max_compressed_length_in_bytes,
chunk_length_in_bytes);
+ min_compress_ratio =
CompressionParams.calcMinCompressRatio(chunk_length_in_bytes,
max_compressed_length_in_bytes);
+ } catch (IllegalArgumentException e) {
+ throw new
ConfigurationException(invalidValue(MAX_COMPRESSED_LENGTH, e.getMessage(),
max_compressed_length_str));
+ }
+ }
+ else
+ {
+ min_compress_ratio = removeMinCompressRatio(options);
+ validateMinCompressRatio(min_compress_ratio);
+ max_compressed_length_in_bytes =
CompressionParams.calcMaxCompressedLength(chunk_length_in_bytes,
min_compress_ratio);
+ }
+
+ CompressorType compressorType =
CompressorType.fromName(sstableCompressionClass);
+
+ // if the compressor is not one of the regular ones (e.g. enumerated
in ComnpressorType) create a function to build it.
+ Function<Map<String,String>, ICompressor> creator = compressorType !=
null ? compressorType.creator : (opt) -> {
+ if (sstableCompressionClass != null)
+ {
+ return
newCompressor(parseCompressorClass(sstableCompressionClass), opt);
+ }
+ else
+ {
+ return
newCompressor(parseCompressorClass(defaultParams(null).klass().getName()), opt);
+ }
+ };
+ try
+ {
+ CompressionParams cp = new CompressionParams(enabled ?
creator.apply(options) : null, chunk_length_in_bytes,
max_compressed_length_in_bytes, min_compress_ratio, options);
+ if (enabled && compressorType != CompressorType.none)
+ {
+ ICompressor compressor = cp.sstableCompressor;
+ if (compressor == null)
+ throw new ConfigurationException(format("'%s' is not a
valid compressor class name for the 'class' option.", sstableCompressionClass));
+ else
+ checkCompressorOptions(compressor, options.keySet());
+ }
+
+ cp.validate();
+
+ return cp;
+ } catch (ConfigurationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ConfigurationException(e.getMessage(), e);
+ }
}
- @VisibleForTesting
- public static CompressionParams noop()
+ private static ICompressor newCompressor(Class<?> className, Map<String,
String> parameters) throws ConfigurationException
{
- NoopCompressor compressor =
NoopCompressor.create(Collections.emptyMap());
- return new CompressionParams(compressor, DEFAULT_CHUNK_LENGTH,
Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap());
+ if (className == null)
+ {
+ if (!parameters.isEmpty())
+ throw new ConfigurationException("Unknown compression options
(" + parameters.keySet() + ") since no compression class found");
+ return null;
+ }
+
+ try
+ {
+ Method method = className.getMethod("create", Map.class);
+ ICompressor compressor = (ICompressor) method.invoke(null,
parameters);
+ // Check for unknown options
+ CompressionParams.checkCompressorOptions(compressor,
parameters.keySet());
+ return compressor;
+ }
+ catch (NoSuchMethodException e)
+ {
+ throw new ConfigurationException("create method not found", e);
+ }
+ catch (SecurityException e)
+ {
+ throw new ConfigurationException("Access forbiden", e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new ConfigurationException("Cannot access method create in "
+ className.getName(), e);
+ }
+ catch (InvocationTargetException e)
+ {
+ if (e.getTargetException() instanceof ConfigurationException)
Review Comment:
we have `Throwables.unwrap` method for this
##########
src/java/org/apache/cassandra/schema/CompressionParams.java:
##########
@@ -41,163 +47,316 @@
import org.apache.cassandra.net.MessagingService;
import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT;
public final class CompressionParams
{
- public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16;
+ public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; // in KB
public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0; //
Since pre-4.0 versions do not understand the
// new
compression parameter we can't use a
//
different default value.
public static final IVersionedSerializer<CompressionParams> serializer =
new Serializer();
public static final String CLASS = "class";
public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb";
+ /** Requires a DataStorageSpec suffix */
+ public static final String CHUNK_LENGTH = "chunk_length";
+ /** Requires a DataStorageSpec suffix */
+ public static final String MAX_COMPRESSED_LENGTH = "max_compressed_length";
public static final String ENABLED = "enabled";
public static final String MIN_COMPRESS_RATIO = "min_compress_ratio";
- public static final CompressionParams DEFAULT =
!CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean()
- ? noCompression()
- : new
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
-
DEFAULT_CHUNK_LENGTH,
-
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
-
DEFAULT_MIN_COMPRESS_RATIO,
-
Collections.emptyMap());
-
- public static final CompressionParams NOOP = new
CompressionParams(NoopCompressor.create(Collections.emptyMap()),
+ /** A compressor that does nothing */
+ public static final CompressionParams NOOP = new
CompressionParams(NoopCompressor.create(emptyMap()),
// 4
KiB is often the underlying disk block size
1024 *
4,
Integer.MAX_VALUE,
DEFAULT_MIN_COMPRESS_RATIO,
-
Collections.emptyMap());
+
emptyMap());
+
+ /** (legacy) Default for when no other compression is specified.
+ * You probalby want defaultParams(keyspace) rather than this. */
+ public static final CompressionParams DEFAULT = new
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
+
DEFAULT_CHUNK_LENGTH,
+
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
+
DEFAULT_MIN_COMPRESS_RATIO,
+
emptyMap());
+
+ /** A guaranteed FAST compressor */
+ public static final CompressionParams FAST = DEFAULT;
+
+ /** The default calculated fromthe config.yaml */
+ private static CompressionParams CALCULATED_DEFAULT;
+
+ /** error message format for when 'chunk_length' and 'chunklenth_in_kb"
are both specified */
+ @VisibleForTesting
+ public static final String TOO_MANY_CHUNK_LENGTH = format("Only one of
'%s' or '%s' may be specified", CHUNK_LENGTH, CHUNK_LENGTH_IN_KB);
private final ICompressor sstableCompressor;
+ /** The chunk length in KB */
private final int chunkLength;
- private final int maxCompressedLength; // In content we store max length
to avoid rounding errors causing compress/decompress mismatch.
- private final double minCompressRatio; // In configuration we store min
ratio, the input parameter.
- private final ImmutableMap<String, String> otherOptions; // Unrecognized
options, can be used by the compressor
+ /**
+ * The compressed length in KB.
+ * In content we store max length to avoid rounding errors causing
compress/decompress mismatch.
+ */
+ private final int maxCompressedLength;
+ /**
+ * The minimum compression ratio.
+ * In configuration we store min ratio, the input parameter.
+ * Ths is mathematically related to chunkLength and maxCompressedLength in
that
+ * # chunk_length / max_compressed_length = min_compress_ratio
+ */
+ private final double minCompressRatio;
+ /** unrecognized options, may contain comressor specific options */
+ private final ImmutableMap<String, String> otherOptions;
- public static CompressionParams fromMap(Map<String, String> opts)
+ public enum CompressorType
{
- Map<String, String> options = copyOptions(opts);
-
- String sstableCompressionClass;
+ lz4(LZ4Compressor.class.getName(), LZ4Compressor::create),
+ noop(NoopCompressor.class.getName(), NoopCompressor::create),
+ snappy(SnappyCompressor.class.getName(), SnappyCompressor::create),
+ deflate(DeflateCompressor.class.getName(), DeflateCompressor::create),
+ zstd(ZstdCompressor.class.getName(), ZstdCompressor::create),
+ none(null, (opt) -> null);
- if (!opts.isEmpty() && isEnabled(opts) && !options.containsKey(CLASS))
- throw new ConfigurationException(format("Missing sub-option '%s'
for the 'compression' option.", CLASS));
+ final String className;
+ final Function<Map<String,String>,ICompressor> creator;
- if (!removeEnabled(options) && !options.isEmpty())
- throw new ConfigurationException(format("If the '%s' option is set
to false no other options must be specified", ENABLED));
- else
- sstableCompressionClass = removeSSTableCompressionClass(options);
-
- int chunkLength = removeChunkLength(options);
- double minCompressRatio = removeMinCompressRatio(options);
+ CompressorType(String className,
Function<Map<String,String>,ICompressor> creator)
+ {
+ this.className = className;
+ this.creator = creator;
+ }
- CompressionParams cp = new CompressionParams(sstableCompressionClass,
options, chunkLength, minCompressRatio);
- cp.validate();
+ static CompressorType fromName(String name)
+ {
+ if (name == null)
+ return null;
- return cp;
- }
+ for (CompressorType type : CompressorType.values())
+ {
+ if (type.name().equals(name) || Objects.equal(type.className,
name))
+ return type;
+ if (type.className != null && (type.className.equals(name) ||
type.className.endsWith("."+name)))
+ return type;
+ }
+ return null;
+ }
- public Class<? extends ICompressor> klass()
- {
- return sstableCompressor.getClass();
+ public ICompressor create(Map<String,String> options) {
+ return creator.apply(options);
+ }
}
- public static CompressionParams noCompression()
+ /**
+ * Gets the default compression params for the keyspace.
+ * This method accounts for issues when the compression params change for
system keyspaces.
+ * @param keyspace the name of the keyspace to get params for. (may be
null for non-System keyspaces)
+ * @return The compresson parameters for the keyspace.
+ */
+ public static CompressionParams defaultParams(String keyspace)
{
- return new CompressionParams(null, DEFAULT_CHUNK_LENGTH,
Integer.MAX_VALUE, 0.0, Collections.emptyMap());
- }
+ if (keyspace != null &&
SchemaConstants.getLocalAndReplicatedSystemKeyspaceNames().contains(keyspace))
+ return DEFAULT;
- // The shorthand methods below are only used for tests. They are a little
inconsistent in their choice of
- // parameters -- this is done on purpose to test out various compression
parameter combinations.
+ CompressionParams result = CALCULATED_DEFAULT;
+ if (result == null)
+ result = CALCULATED_DEFAULT =
fromParameterizedClass(DatabaseDescriptor.getSSTableCompression());
- @VisibleForTesting
- public static CompressionParams snappy()
- {
- return snappy(DEFAULT_CHUNK_LENGTH);
+ return result;
}
- @VisibleForTesting
- public static CompressionParams snappy(int chunkLength)
+ public static CompressionParams fromParameterizedClass(ParameterizedClass
options)
{
- return snappy(chunkLength, 1.1);
- }
+ if (options == null)
+ return DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean() ?
DEFAULT : noCompression();
- @VisibleForTesting
- public static CompressionParams snappy(int chunkLength, double
minCompressRatio)
- {
- return new CompressionParams(SnappyCompressor.instance, chunkLength,
calcMaxCompressedLength(chunkLength, minCompressRatio), minCompressRatio,
Collections.emptyMap());
+ return fromClassAndOptions(options.class_name, options.parameters ==
null ? emptyMap() : copyOptions(options.parameters));
}
- @VisibleForTesting
- public static CompressionParams deflate()
+ public static CompressionParams fromMap(Map<String, String> opts)
{
- return deflate(DEFAULT_CHUNK_LENGTH);
- }
+ Map<String, String> options = copyOptions(opts);
- @VisibleForTesting
- public static CompressionParams deflate(int chunkLength)
- {
- return new CompressionParams(DeflateCompressor.instance, chunkLength,
Integer.MAX_VALUE, 0.0, Collections.emptyMap());
- }
+ String sstableCompressionClass =
removeSSTableCompressionClass(options);
- @VisibleForTesting
- public static CompressionParams lz4()
- {
- return lz4(DEFAULT_CHUNK_LENGTH);
+ return fromClassAndOptions(sstableCompressionClass, options);
}
- @VisibleForTesting
- public static CompressionParams lz4(int chunkLength)
+ public static CompressionParams noCompression()
{
- return lz4(chunkLength, chunkLength);
+ return new CompressionParams(null, DEFAULT_CHUNK_LENGTH,
Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, emptyMap());
}
- @VisibleForTesting
- public static CompressionParams lz4(int chunkLength, int
maxCompressedLength)
+ static int calcMaxCompressedLength(int chunkLength, double
minCompressRatio)
{
- return new
CompressionParams(LZ4Compressor.create(Collections.emptyMap()), chunkLength,
maxCompressedLength, calcMinCompressRatio(chunkLength, maxCompressedLength),
Collections.emptyMap());
+ return (int) Math.ceil(Math.min(chunkLength / minCompressRatio,
Integer.MAX_VALUE));
}
- public static CompressionParams zstd()
+ static double calcMinCompressRatio(int chunkLength, int
maxCompressedLength)
{
- return zstd(DEFAULT_CHUNK_LENGTH);
+ if (maxCompressedLength == Integer.MAX_VALUE)
+ return 0;
+ return chunkLength * 1.0 / maxCompressedLength;
}
- public static CompressionParams zstd(Integer chunkLength)
+ private static CompressionParams fromClassAndOptions(String
sstableCompressionClass, Map<String,String> options)
{
- ZstdCompressor compressor =
ZstdCompressor.create(Collections.emptyMap());
- return new CompressionParams(compressor, chunkLength,
Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap());
+ final boolean enabled = removeEnabled(options);
+
+ if (options.containsKey(CHUNK_LENGTH_IN_KB) &&
options.containsKey(CHUNK_LENGTH))
+ throw new ConfigurationException(format(TOO_MANY_CHUNK_LENGTH,
+ CHUNK_LENGTH_IN_KB,
CHUNK_LENGTH));
+
+ final int chunk_length_in_bytes = removeChunkLength(options);
+
+ // figure out how we calculate the max_compressed_length and
min_compress_ratio
+ if (options.containsKey(MIN_COMPRESS_RATIO) &&
options.containsKey(MAX_COMPRESSED_LENGTH))
+ throw new ConfigurationException(format("Can not specify both '%s'
and '%s' for the compressor parameters.",
+ MIN_COMPRESS_RATIO,
MAX_COMPRESSED_LENGTH));
+
+ // calculate the max_compressed_length and min_compress_ratio
+ int max_compressed_length_in_bytes;
+ double min_compress_ratio;
+ String max_compressed_length_str =
options.remove(MAX_COMPRESSED_LENGTH);
+ if (!StringUtils.isBlank(max_compressed_length_str))
+ {
+ try
+ {
+ max_compressed_length_in_bytes = new
DataStorageSpec.IntKibibytesBound(max_compressed_length_str).toBytes();
+ validateMaxCompressedLength(max_compressed_length_in_bytes,
chunk_length_in_bytes);
+ min_compress_ratio =
CompressionParams.calcMinCompressRatio(chunk_length_in_bytes,
max_compressed_length_in_bytes);
+ } catch (IllegalArgumentException e) {
+ throw new
ConfigurationException(invalidValue(MAX_COMPRESSED_LENGTH, e.getMessage(),
max_compressed_length_str));
+ }
+ }
+ else
+ {
+ min_compress_ratio = removeMinCompressRatio(options);
+ validateMinCompressRatio(min_compress_ratio);
+ max_compressed_length_in_bytes =
CompressionParams.calcMaxCompressedLength(chunk_length_in_bytes,
min_compress_ratio);
+ }
+
+ CompressorType compressorType =
CompressorType.fromName(sstableCompressionClass);
+
+ // if the compressor is not one of the regular ones (e.g. enumerated
in ComnpressorType) create a function to build it.
+ Function<Map<String,String>, ICompressor> creator = compressorType !=
null ? compressorType.creator : (opt) -> {
+ if (sstableCompressionClass != null)
+ {
+ return
newCompressor(parseCompressorClass(sstableCompressionClass), opt);
+ }
+ else
+ {
+ return
newCompressor(parseCompressorClass(defaultParams(null).klass().getName()), opt);
+ }
+ };
+ try
+ {
+ CompressionParams cp = new CompressionParams(enabled ?
creator.apply(options) : null, chunk_length_in_bytes,
max_compressed_length_in_bytes, min_compress_ratio, options);
+ if (enabled && compressorType != CompressorType.none)
+ {
+ ICompressor compressor = cp.sstableCompressor;
+ if (compressor == null)
+ throw new ConfigurationException(format("'%s' is not a
valid compressor class name for the 'class' option.", sstableCompressionClass));
+ else
+ checkCompressorOptions(compressor, options.keySet());
+ }
+
+ cp.validate();
+
+ return cp;
+ } catch (ConfigurationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ConfigurationException(e.getMessage(), e);
+ }
}
- @VisibleForTesting
- public static CompressionParams noop()
+ private static ICompressor newCompressor(Class<?> className, Map<String,
String> parameters) throws ConfigurationException
{
- NoopCompressor compressor =
NoopCompressor.create(Collections.emptyMap());
- return new CompressionParams(compressor, DEFAULT_CHUNK_LENGTH,
Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap());
+ if (className == null)
+ {
+ if (!parameters.isEmpty())
+ throw new ConfigurationException("Unknown compression options
(" + parameters.keySet() + ") since no compression class found");
+ return null;
+ }
+
+ try
+ {
+ Method method = className.getMethod("create", Map.class);
+ ICompressor compressor = (ICompressor) method.invoke(null,
parameters);
+ // Check for unknown options
+ CompressionParams.checkCompressorOptions(compressor,
parameters.keySet());
+ return compressor;
+ }
+ catch (NoSuchMethodException e)
Review Comment:
I don't see a reason to catch each of those exception in its distinct block
- to me it would be enough to just catch all the exception in one block, unwrap
and rethrow as `ConfigurationException` if it isn't already
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]