Mmuzaf commented on code in PR #3330:
URL: https://github.com/apache/cassandra/pull/3330#discussion_r1759158749


##########
test/unit/org/apache/cassandra/schema/TestCompressionParamsFactory.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.util.Map;
+
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.NoopCompressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.compress.ZstdCompressor;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH;
+import static 
org.apache.cassandra.schema.CompressionParams.DEFAULT_MIN_COMPRESS_RATIO;
+
+/**
+ * Contains simple constructors for various Compression implementations.
+ * They are a little inconsistent in their choice of parameters -- this is 
done on purpose to test out various compression parameter combinations.
+ */
+public class TestCompressionParamsFactory

Review Comment:
   I'm going to be a bit of a nerd, but this class doesn't seem to have 
anything to do with the changes we're going to be making. 
   Concerns are the following:
   - Looks like refactoring doesn't need to be done here, as the changes are 
not directly related to the issue
   -the `CompressionParams` and default constructors might be used for 
dependent projects, so moving these methods to a test class might break the 
compilation there;
   - Renaming to the `TestCompressionParamsFactory` generates a lot of code 
changes;
   
   So I would suggest rolling back those changes for now and creating another 
LHF issue to do that. 
   
   
   



##########
doc/modules/cassandra/pages/developing/cql/cql_singlefile.adoc:
##########
@@ -678,17 +678,17 @@ to the risk the jvm must also be started with
 
 For the `compression` property, the following sub-options are available:
 
-[cols=",,,,,",options="header",]
+[cols=",,",options="header",]
 |===
-|option |default |description | | |
+|option |default |description

Review Comment:
   Should we also update these docs with all the compression classes we support?



##########
test/conf/cassandra-with-sstable-compressor.yaml:
##########
@@ -0,0 +1,113 @@
+
+cluster_name: Test Cluster
+# memtable_allocation_type: heap_buffers
+memtable_allocation_type: offheap_objects
+commitlog_sync: batch
+#commitlog_sync_batch_window_in_ms: 1.0
+commitlog_segment_size: 5MiB
+commitlog_directory: build/test/cassandra/commitlog
+# commitlog_compression:
+# - class_name: LZ4Compressor
+cdc_raw_directory: build/test/cassandra/cdc_raw
+cdc_enabled: false
+hints_directory: build/test/cassandra/hints
+partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
+listen_address: 127.0.0.1
+storage_port: 7012
+ssl_storage_port: 17012
+start_native_transport: true
+native_transport_port: 9042
+column_index_size: 4KiB
+saved_caches_directory: build/test/cassandra/saved_caches
+data_file_directories:
+    - build/test/cassandra/data
+disk_access_mode: mmap
+seed_provider:
+    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
+      parameters:
+          - seeds: "127.0.0.1:7012"
+endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+dynamic_snitch: true
+server_encryption_options:
+    internode_encryption: none
+    keystore: conf/.keystore
+    keystore_password: cassandra
+    truststore: conf/.truststore
+    truststore_password: cassandra
+incremental_backups: true
+concurrent_compactors: 4
+compaction_throughput: 0MiB/s
+row_cache_class_name: org.apache.cassandra.cache.OHCProvider
+row_cache_size: 16MiB
+user_defined_functions_enabled: true
+scripted_user_defined_functions_enabled: true
+prepared_statements_cache_size: 1MiB
+corrupted_tombstone_strategy: exception
+stream_entire_sstables: true
+stream_throughput_outbound: 23841858MiB/s
+sasi_indexes_enabled: true
+materialized_views_enabled: true
+drop_compact_storage_enabled: true
+file_cache_enabled: true
+auto_hints_cleanup_enabled: true
+
+read_thresholds_enabled: true
+coordinator_read_size_warn_threshold: 1024KiB
+coordinator_read_size_fail_threshold: 4096KiB
+local_read_size_warn_threshold: 4096KiB
+local_read_size_fail_threshold: 8192KiB
+row_index_read_size_warn_threshold: 4096KiB
+row_index_read_size_fail_threshold: 8192KiB
+
+memtable:
+    configurations:
+        skiplist:
+            inherits: default
+            class_name: SkipListMemtable
+        skiplist_sharded:
+            class_name: ShardedSkipListMemtable
+            parameters:
+                serialize_writes: false
+                shards: 4
+        skiplist_sharded_locking:
+            inherits: skiplist_sharded
+            parameters:
+                serialize_writes: true
+        skiplist_remapped:
+            inherits: skiplist
+        test_fullname:
+            inherits: default
+            class_name: org.apache.cassandra.db.memtable.TestMemtable
+        test_shortname:
+            class_name: TestMemtable
+            parameters:
+                skiplist: true  # note: YAML must interpret this as string, 
not a boolean
+        test_empty_class:
+            class_name: ""
+        test_missing_class:
+            parameters:
+        test_unknown_class:
+            class_name: NotExisting
+        test_invalid_param:
+            class_name: SkipListMemtable
+            parameters:
+                invalid: throw
+        test_invalid_extra_param:
+            inherits: test_shortname
+            parameters:
+                invalid: throw
+        test_invalid_factory_method:
+            class_name: 
org.apache.cassandra.cql3.validation.operations.CreateTest$InvalidMemtableFactoryMethod
+        test_invalid_factory_field:
+            class_name: 
org.apache.cassandra.cql3.validation.operations.CreateTest$InvalidMemtableFactoryField
+
+sstable:
+    compression:
+        configurations:

Review Comment:
   This is probably not related to this issue and we should create a follow-up 
issue for the following question: Should we disallow the case where we list 
customized compression configurations and yet have no 'default' configuration?



##########
src/java/org/apache/cassandra/schema/CompressionParams.java:
##########
@@ -20,184 +20,355 @@
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.InheritingClass;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.compress.*;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.NoopCompressor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Throwables;
 
 import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT;
 
 public final class CompressionParams
 {
-    public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16;
+    public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; // in KB
     public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0;        // 
Since pre-4.0 versions do not understand the
-                                                                        // new 
compression parameter we can't use a
-                                                                        // 
different default value.
+    // new compression parameter we can't use a
+    // different default value.
     public static final IVersionedSerializer<CompressionParams> serializer = 
new Serializer();
 
     public static final String CLASS = "class";
     public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb";
+
+    /**
+     * Requires a DataStorageSpec suffix
+     */
+    public static final String MAX_COMPRESSED_LENGTH = "max_compressed_length";
     public static final String ENABLED = "enabled";
     public static final String MIN_COMPRESS_RATIO = "min_compress_ratio";
 
-    public static final CompressionParams DEFAULT = 
!CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean()
-                                                    ? noCompression()
-                                                    : new 
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
-                                                                            
DEFAULT_CHUNK_LENGTH,
-                                                                            
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
-                                                                            
DEFAULT_MIN_COMPRESS_RATIO,
-                                                                            
Collections.emptyMap());
-
-    public static final CompressionParams NOOP = new 
CompressionParams(NoopCompressor.create(Collections.emptyMap()),
+    public static final CompressionParams NOOP = new 
CompressionParams(NoopCompressor.create(emptyMap()),
                                                                        // 4 
KiB is often the underlying disk block size
                                                                        1024 * 
4,
                                                                        
Integer.MAX_VALUE,
                                                                        
DEFAULT_MIN_COMPRESS_RATIO,
-                                                                       
Collections.emptyMap());
+                                                                       
emptyMap());
 
-    private final ICompressor sstableCompressor;
-    private final int chunkLength;
-    private final int maxCompressedLength;  // In content we store max length 
to avoid rounding errors causing compress/decompress mismatch.
-    private final double minCompressRatio;  // In configuration we store min 
ratio, the input parameter.
-    private final ImmutableMap<String, String> otherOptions; // Unrecognized 
options, can be used by the compressor
+    /**
+     * (legacy) Default for when no other compression is specified
+     */
+    public static final CompressionParams DEFAULT = new 
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
+                                                                          
DEFAULT_CHUNK_LENGTH,
+                                                                          
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
+                                                                          
DEFAULT_MIN_COMPRESS_RATIO,
+                                                                          
emptyMap());
 
-    public static CompressionParams fromMap(Map<String, String> opts)
-    {
-        Map<String, String> options = copyOptions(opts);
+    /**
+     * A guaranteed FAST compressor
+     */
+    public static final CompressionParams FAST = DEFAULT;

Review Comment:
   I think we should either remove the `FAST` and use the `DEFAULT` everywhere 
or rename it to `LZ4_FAST_COMPRESSOR` to make it more clear. 



##########
src/java/org/apache/cassandra/schema/CompressionParams.java:
##########
@@ -20,184 +20,355 @@
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.InheritingClass;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.compress.*;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.NoopCompressor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Throwables;
 
 import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT;
 
 public final class CompressionParams
 {
-    public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16;
+    public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; // in KB
     public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0;        // 
Since pre-4.0 versions do not understand the
-                                                                        // new 
compression parameter we can't use a
-                                                                        // 
different default value.
+    // new compression parameter we can't use a
+    // different default value.
     public static final IVersionedSerializer<CompressionParams> serializer = 
new Serializer();
 
     public static final String CLASS = "class";
     public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb";
+
+    /**
+     * Requires a DataStorageSpec suffix
+     */
+    public static final String MAX_COMPRESSED_LENGTH = "max_compressed_length";
     public static final String ENABLED = "enabled";
     public static final String MIN_COMPRESS_RATIO = "min_compress_ratio";
 
-    public static final CompressionParams DEFAULT = 
!CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean()
-                                                    ? noCompression()
-                                                    : new 
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
-                                                                            
DEFAULT_CHUNK_LENGTH,
-                                                                            
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
-                                                                            
DEFAULT_MIN_COMPRESS_RATIO,
-                                                                            
Collections.emptyMap());
-
-    public static final CompressionParams NOOP = new 
CompressionParams(NoopCompressor.create(Collections.emptyMap()),
+    public static final CompressionParams NOOP = new 
CompressionParams(NoopCompressor.create(emptyMap()),
                                                                        // 4 
KiB is often the underlying disk block size
                                                                        1024 * 
4,
                                                                        
Integer.MAX_VALUE,
                                                                        
DEFAULT_MIN_COMPRESS_RATIO,
-                                                                       
Collections.emptyMap());
+                                                                       
emptyMap());
 
-    private final ICompressor sstableCompressor;
-    private final int chunkLength;
-    private final int maxCompressedLength;  // In content we store max length 
to avoid rounding errors causing compress/decompress mismatch.
-    private final double minCompressRatio;  // In configuration we store min 
ratio, the input parameter.
-    private final ImmutableMap<String, String> otherOptions; // Unrecognized 
options, can be used by the compressor
+    /**
+     * (legacy) Default for when no other compression is specified
+     */
+    public static final CompressionParams DEFAULT = new 
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
+                                                                          
DEFAULT_CHUNK_LENGTH,
+                                                                          
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
+                                                                          
DEFAULT_MIN_COMPRESS_RATIO,
+                                                                          
emptyMap());
 
-    public static CompressionParams fromMap(Map<String, String> opts)
-    {
-        Map<String, String> options = copyOptions(opts);
+    /**
+     * A guaranteed FAST compressor
+     */
+    public static final CompressionParams FAST = DEFAULT;
 
-        String sstableCompressionClass;
+    /**
+     * The default calculated from cassandra.yaml
+     */
+    private static CompressionParams CALCULATED_DEFAULT;
 
-        if (!opts.isEmpty() && isEnabled(opts) && !options.containsKey(CLASS))
-            throw new ConfigurationException(format("Missing sub-option '%s' 
for the 'compression' option.", CLASS));
+    private final ICompressor sstableCompressor;
+    /**
+     * The chunk length in KB
+     */
+    private final int chunkLength;
+    /**
+     * The compressed length in KB.
+     * In content we store max length to avoid rounding errors causing 
compress/decompress mismatch.
+     */
+    private final int maxCompressedLength;
+    /**
+     * The minimum compression ratio.
+     * In configuration, we store min ratio, the input parameter.
+     * Ths is mathematically related to chunkLength and maxCompressedLength in 
that
+     * # chunk_length / max_compressed_length = min_compress_ratio
+     */
+    private final double minCompressRatio;
+    /**
+     * unrecognized options, may contain compressor specific options
+     */
+    private final ImmutableMap<String, String> otherOptions;
 
-        if (!removeEnabled(options) && !options.isEmpty())
-            throw new ConfigurationException(format("If the '%s' option is set 
to false no other options must be specified", ENABLED));
-        else
-            sstableCompressionClass = removeSSTableCompressionClass(options);
+    /**
+     * Gets the default compression params for the keyspace.
+     * This method accounts for issues when the compression params change for 
system keyspaces.
+     *
+     * @param keyspace the name of the keyspace to get params for. (may be 
null for non-System keyspaces)
+     * @return The compresson parameters for the keyspace.
+     */
+    public static CompressionParams defaultParams(String keyspace)
+    {
+        if (keyspace != null && 
SchemaConstants.getLocalAndReplicatedSystemKeyspaceNames().contains(keyspace))
+            return DEFAULT;
 
-        int chunkLength = removeChunkLength(options);
-        double minCompressRatio = removeMinCompressRatio(options);
+        CompressionParams result = CALCULATED_DEFAULT;

Review Comment:
   Saving the calculated default from the config doesn't make sense to me 
either. From my point of view the process should look like this:
   
   1. We parse the yaml file
   2. If the yaml contains compression params, _then_ set them to the 
`Config.SSTableConfig`
   3. If the yaml doesn't have compression params, _then_ initialize the 
`Config.SSTableConfig` with the defaults from the `CompressionParams DEFAULT`.
   4. When we want create a `CompressonParams` for a keyspace e.g. by calling 
`defaultParams(String keyspace)` we always read the configuration stored in 
`Config.SSTableConfig` and convert it to CompressonParams inside the 
`defaultParams` method.
   
   It seems that such a technique could remove a lot of null-checks in this 
method and all calculated defaults stored out of the Config class. It also 
seems that will simplify the code a lot. WDYT?



##########
test/distributed/org/apache/cassandra/distributed/test/SSTableCompressionTest.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.Config.SSTableConfig;
+import org.apache.cassandra.config.InheritingClass;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.NoopCompressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.compress.ZstdCompressor;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.service.StorageService;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
+import static 
org.apache.cassandra.schema.SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES;
+import static 
org.apache.cassandra.schema.SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES;
+import static org.apache.commons.io.FileUtils.readFileToString;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableCompressionTest

Review Comment:
   We should also add some tests for the `system_views.settings' table, which 
will return the current configuration used after initializing a node, e.g. the 
default compression configuration if the config is omitted in the yaml file, 
and the custom configuration read from the yaml.
   
   The following query currently returns `null` config for the default config:
   `SELECT * FROM vts.settings WHERE name = 'sstable.compression.configurations`
   
   



##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -252,6 +254,8 @@ public class DatabaseDescriptor
                                                                                
                            ? new CommitLogSegmentManagerCDC(c, 
DatabaseDescriptor.getCommitLogLocation())
                                                                                
                            : new CommitLogSegmentManagerStandard(c, 
DatabaseDescriptor.getCommitLogLocation());
 
+    private static Map<String, ParameterizedClass> 
defaultSSTableCompressionConfigs = new HashMap<>();

Review Comment:
   As far as I understand, if the configuration is not set in the yaml file 
we'll initialize this property with a default configuration and leave 
`Config.SSTableConfig` empty.
   
   When we traverse the `Config` class to expose the running configuration to a 
user e.g. via the settings virtual table it won't see any compression 
configuration that has been initialized by default, and this is a bad thing. 
The Config class must be always consistent in terms of which configuration we 
are running. 
   
   This is an example of the issue we previously had to deal with:
   https://issues.apache.org/jira/browse/CASSANDRA-17738
   
   
   
   



##########
src/java/org/apache/cassandra/schema/CompressionParams.java:
##########
@@ -20,184 +20,355 @@
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.InheritingClass;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.compress.*;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.NoopCompressor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Throwables;
 
 import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT;
 
 public final class CompressionParams
 {
-    public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16;
+    public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; // in KB
     public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0;        // 
Since pre-4.0 versions do not understand the
-                                                                        // new 
compression parameter we can't use a
-                                                                        // 
different default value.
+    // new compression parameter we can't use a
+    // different default value.
     public static final IVersionedSerializer<CompressionParams> serializer = 
new Serializer();
 
     public static final String CLASS = "class";
     public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb";
+
+    /**
+     * Requires a DataStorageSpec suffix
+     */
+    public static final String MAX_COMPRESSED_LENGTH = "max_compressed_length";
     public static final String ENABLED = "enabled";
     public static final String MIN_COMPRESS_RATIO = "min_compress_ratio";
 
-    public static final CompressionParams DEFAULT = 
!CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean()
-                                                    ? noCompression()
-                                                    : new 
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
-                                                                            
DEFAULT_CHUNK_LENGTH,
-                                                                            
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
-                                                                            
DEFAULT_MIN_COMPRESS_RATIO,
-                                                                            
Collections.emptyMap());
-
-    public static final CompressionParams NOOP = new 
CompressionParams(NoopCompressor.create(Collections.emptyMap()),
+    public static final CompressionParams NOOP = new 
CompressionParams(NoopCompressor.create(emptyMap()),
                                                                        // 4 
KiB is often the underlying disk block size
                                                                        1024 * 
4,
                                                                        
Integer.MAX_VALUE,
                                                                        
DEFAULT_MIN_COMPRESS_RATIO,
-                                                                       
Collections.emptyMap());
+                                                                       
emptyMap());
 
-    private final ICompressor sstableCompressor;
-    private final int chunkLength;
-    private final int maxCompressedLength;  // In content we store max length 
to avoid rounding errors causing compress/decompress mismatch.
-    private final double minCompressRatio;  // In configuration we store min 
ratio, the input parameter.
-    private final ImmutableMap<String, String> otherOptions; // Unrecognized 
options, can be used by the compressor
+    /**
+     * (legacy) Default for when no other compression is specified
+     */
+    public static final CompressionParams DEFAULT = new 
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
+                                                                          
DEFAULT_CHUNK_LENGTH,
+                                                                          
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
+                                                                          
DEFAULT_MIN_COMPRESS_RATIO,
+                                                                          
emptyMap());
 
-    public static CompressionParams fromMap(Map<String, String> opts)
-    {
-        Map<String, String> options = copyOptions(opts);
+    /**
+     * A guaranteed FAST compressor
+     */
+    public static final CompressionParams FAST = DEFAULT;
 
-        String sstableCompressionClass;
+    /**
+     * The default calculated from cassandra.yaml
+     */
+    private static CompressionParams CALCULATED_DEFAULT;
 
-        if (!opts.isEmpty() && isEnabled(opts) && !options.containsKey(CLASS))
-            throw new ConfigurationException(format("Missing sub-option '%s' 
for the 'compression' option.", CLASS));
+    private final ICompressor sstableCompressor;
+    /**
+     * The chunk length in KB
+     */
+    private final int chunkLength;
+    /**
+     * The compressed length in KB.
+     * In content we store max length to avoid rounding errors causing 
compress/decompress mismatch.
+     */
+    private final int maxCompressedLength;
+    /**
+     * The minimum compression ratio.
+     * In configuration, we store min ratio, the input parameter.
+     * Ths is mathematically related to chunkLength and maxCompressedLength in 
that
+     * # chunk_length / max_compressed_length = min_compress_ratio
+     */
+    private final double minCompressRatio;
+    /**
+     * unrecognized options, may contain compressor specific options
+     */
+    private final ImmutableMap<String, String> otherOptions;
 
-        if (!removeEnabled(options) && !options.isEmpty())
-            throw new ConfigurationException(format("If the '%s' option is set 
to false no other options must be specified", ENABLED));
-        else
-            sstableCompressionClass = removeSSTableCompressionClass(options);
+    /**
+     * Gets the default compression params for the keyspace.
+     * This method accounts for issues when the compression params change for 
system keyspaces.
+     *
+     * @param keyspace the name of the keyspace to get params for. (may be 
null for non-System keyspaces)
+     * @return The compresson parameters for the keyspace.
+     */
+    public static CompressionParams defaultParams(String keyspace)
+    {
+        if (keyspace != null && 
SchemaConstants.getLocalAndReplicatedSystemKeyspaceNames().contains(keyspace))
+            return DEFAULT;
 
-        int chunkLength = removeChunkLength(options);
-        double minCompressRatio = removeMinCompressRatio(options);
+        CompressionParams result = CALCULATED_DEFAULT;
+        if (result == null)
+        {
+            if (DatabaseDescriptor.getRawConfig() == null)

Review Comment:
   Should we throw an exception in this case if the config is not initialized 
when this method is called? 



##########
src/java/org/apache/cassandra/schema/CompressionParams.java:
##########
@@ -20,184 +20,355 @@
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.InheritingClass;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.compress.*;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.NoopCompressor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Throwables;
 
 import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT;
 
 public final class CompressionParams
 {
-    public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16;
+    public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; // in KB
     public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0;        // 
Since pre-4.0 versions do not understand the
-                                                                        // new 
compression parameter we can't use a
-                                                                        // 
different default value.
+    // new compression parameter we can't use a
+    // different default value.
     public static final IVersionedSerializer<CompressionParams> serializer = 
new Serializer();
 
     public static final String CLASS = "class";
     public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb";
+
+    /**
+     * Requires a DataStorageSpec suffix
+     */
+    public static final String MAX_COMPRESSED_LENGTH = "max_compressed_length";
     public static final String ENABLED = "enabled";
     public static final String MIN_COMPRESS_RATIO = "min_compress_ratio";
 
-    public static final CompressionParams DEFAULT = 
!CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean()
-                                                    ? noCompression()
-                                                    : new 
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
-                                                                            
DEFAULT_CHUNK_LENGTH,
-                                                                            
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
-                                                                            
DEFAULT_MIN_COMPRESS_RATIO,
-                                                                            
Collections.emptyMap());
-
-    public static final CompressionParams NOOP = new 
CompressionParams(NoopCompressor.create(Collections.emptyMap()),
+    public static final CompressionParams NOOP = new 
CompressionParams(NoopCompressor.create(emptyMap()),
                                                                        // 4 
KiB is often the underlying disk block size
                                                                        1024 * 
4,
                                                                        
Integer.MAX_VALUE,
                                                                        
DEFAULT_MIN_COMPRESS_RATIO,
-                                                                       
Collections.emptyMap());
+                                                                       
emptyMap());
 
-    private final ICompressor sstableCompressor;
-    private final int chunkLength;
-    private final int maxCompressedLength;  // In content we store max length 
to avoid rounding errors causing compress/decompress mismatch.
-    private final double minCompressRatio;  // In configuration we store min 
ratio, the input parameter.
-    private final ImmutableMap<String, String> otherOptions; // Unrecognized 
options, can be used by the compressor
+    /**
+     * (legacy) Default for when no other compression is specified
+     */
+    public static final CompressionParams DEFAULT = new 
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
+                                                                          
DEFAULT_CHUNK_LENGTH,
+                                                                          
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
+                                                                          
DEFAULT_MIN_COMPRESS_RATIO,
+                                                                          
emptyMap());
 
-    public static CompressionParams fromMap(Map<String, String> opts)
-    {
-        Map<String, String> options = copyOptions(opts);
+    /**
+     * A guaranteed FAST compressor
+     */
+    public static final CompressionParams FAST = DEFAULT;
 
-        String sstableCompressionClass;
+    /**
+     * The default calculated from cassandra.yaml
+     */
+    private static CompressionParams CALCULATED_DEFAULT;
 
-        if (!opts.isEmpty() && isEnabled(opts) && !options.containsKey(CLASS))
-            throw new ConfigurationException(format("Missing sub-option '%s' 
for the 'compression' option.", CLASS));
+    private final ICompressor sstableCompressor;
+    /**
+     * The chunk length in KB
+     */
+    private final int chunkLength;
+    /**
+     * The compressed length in KB.
+     * In content we store max length to avoid rounding errors causing 
compress/decompress mismatch.
+     */
+    private final int maxCompressedLength;
+    /**
+     * The minimum compression ratio.
+     * In configuration, we store min ratio, the input parameter.
+     * Ths is mathematically related to chunkLength and maxCompressedLength in 
that
+     * # chunk_length / max_compressed_length = min_compress_ratio
+     */
+    private final double minCompressRatio;
+    /**
+     * unrecognized options, may contain compressor specific options
+     */
+    private final ImmutableMap<String, String> otherOptions;
 
-        if (!removeEnabled(options) && !options.isEmpty())
-            throw new ConfigurationException(format("If the '%s' option is set 
to false no other options must be specified", ENABLED));
-        else
-            sstableCompressionClass = removeSSTableCompressionClass(options);
+    /**
+     * Gets the default compression params for the keyspace.
+     * This method accounts for issues when the compression params change for 
system keyspaces.
+     *
+     * @param keyspace the name of the keyspace to get params for. (may be 
null for non-System keyspaces)
+     * @return The compresson parameters for the keyspace.
+     */
+    public static CompressionParams defaultParams(String keyspace)
+    {
+        if (keyspace != null && 
SchemaConstants.getLocalAndReplicatedSystemKeyspaceNames().contains(keyspace))
+            return DEFAULT;
 
-        int chunkLength = removeChunkLength(options);
-        double minCompressRatio = removeMinCompressRatio(options);
+        CompressionParams result = CALCULATED_DEFAULT;
+        if (result == null)
+        {
+            if (DatabaseDescriptor.getRawConfig() == null)
+            {
+                result = CALCULATED_DEFAULT = CompressionParams.DEFAULT;
+                return result;
+            }
 
-        CompressionParams cp = new CompressionParams(sstableCompressionClass, 
options, chunkLength, minCompressRatio);
-        cp.validate();
+            ParameterizedClass defaultSSTableCompression = 
DatabaseDescriptor.getDefaultSSTableCompressionConfigs().get(InheritingClass.DEFAULT_CONFIGURATION_KEY);
+            if (defaultSSTableCompression == null)
+                result = CALCULATED_DEFAULT = DEFAULT;
+            else
+                result = CALCULATED_DEFAULT = 
fromParameterizedClass(defaultSSTableCompression);
+        }
 
-        return cp;
+        return result;
     }
 
-    public Class<? extends ICompressor> klass()
+    public static CompressionParams fromParameterizedClass(ParameterizedClass 
options)

Review Comment:
   I might be missing something, but this method probably should have the 
following signature:
   `public static CompressionParams 
fromParameterizedClass(SSTableCompressionConfig config)`



##########
doc/modules/cassandra/pages/developing/cql/cql_singlefile.adoc:
##########
@@ -678,17 +678,17 @@ to the risk the jvm must also be started with
 
 For the `compression` property, the following sub-options are available:
 
-[cols=",,,,,",options="header",]
+[cols=",,",options="header",]
 |===
-|option |default |description | | |
+|option |default |description
 |`class` |LZ4Compressor |The compression algorithm to use. Default
 compressor are: LZ4Compressor, SnappyCompressor and DeflateCompressor.
 Use `'enabled' : false` to disable compression. Custom compressor can be
 provided by specifying the full class name as a link:#constants[string
-constant]. | | |
-
+constant].
 |`enabled` |true |By default compression is enabled. To disable it, set
-`enabled` to `false` |`chunk_length_in_kb` |64KB |On disk SSTables are
+`enabled` to `false`
+|`chunk_length_in_kb` |16 |On disk SSTables are

Review Comment:
   It looks like we can update the default in the `compress-subproperties.adoc` 
as well. 
   
   
https://github.com/instaclustr/cassandra/blob/CASSANDRA-12937-squashed/doc/modules/cassandra/partials/compress-subproperties.adoc?plain=1#L59



##########
src/java/org/apache/cassandra/io/sstable/format/DataComponent.java:
##########
@@ -86,12 +86,17 @@ private static CompressionParams 
buildCompressionParams(TableMetadata metadata,
                     compressionParams = CompressionParams.NOOP;
                     break;
                 case fast:
-                    if 
(!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
+                    // The default compressor is generally Fast, but just in 
case we verify it.
+                    CompressionParams defaultCompressionParams = 
CompressionParams.defaultParams(metadata.keyspace);
+                    ICompressor maybeFastCompressor = 
defaultCompressionParams.getSstableCompressor();
+                    if 
(maybeFastCompressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
 {
+                        compressionParams = defaultCompressionParams;
+                    }
+                    else
                     {
-                        // The default compressor is generally fast (LZ4 with 
16KiB block size)
-                        compressionParams = CompressionParams.DEFAULT;
-                        break;
+                        compressionParams = CompressionParams.FAST;

Review Comment:
   Please add a test to the `CompressionParamsTest` that checks if the 
`CompressionParams.DEFAULT` uses the `LZ4Compressor` "fast" by default or an 
assertion here. The current assumption that it's "fast" is fragile. The latter 
is better.



##########
src/java/org/apache/cassandra/io/sstable/format/DataComponent.java:
##########
@@ -86,12 +86,17 @@ private static CompressionParams 
buildCompressionParams(TableMetadata metadata,
                     compressionParams = CompressionParams.NOOP;
                     break;
                 case fast:
-                    if 
(!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
+                    // The default compressor is generally Fast, but just in 
case we verify it.
+                    CompressionParams defaultCompressionParams = 
CompressionParams.defaultParams(metadata.keyspace);
+                    ICompressor maybeFastCompressor = 
defaultCompressionParams.getSstableCompressor();
+                    if 
(maybeFastCompressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
 {

Review Comment:
   I would fix this logic. For the system keyspaces, we should not change the 
default compression at all as the defaults might change, regardless of whether 
it is "fast" for the system keyspaces or not.



##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -1630,22 +1634,44 @@ else if (conf != null && 
conf.storage_compatibility_mode != null)
             storageCompatibilityMode = conf.storage_compatibility_mode;
     }
 
-    private static void applySSTableFormats()
+    private static void applySSTableConfig()
     {
         ServiceLoader<SSTableFormat.Factory> loader = 
ServiceLoader.load(SSTableFormat.Factory.class, 
DatabaseDescriptor.class.getClassLoader());
         List<SSTableFormat.Factory> factories = Iterables.toList(loader);
         if (factories.isEmpty())
             factories = ImmutableList.of(new BigFormat.BigFormatFactory());
-        applySSTableFormats(factories, conf.sstable);
+        applySSTableConfig(factories, conf.sstable);
     }
 
-    private static void applySSTableFormats(Iterable<SSTableFormat.Factory> 
factories, Config.SSTableConfig sstableFormatsConfig)
+    private static void applySSTableConfig(Iterable<SSTableFormat.Factory> 
factories, Config.SSTableConfig sstableConfig)
     {
+        InheritingClass defaultParameterizedClass = new InheritingClass(null, 
CompressionParams.DEFAULT.klass().getName(), 
CompressionParams.DEFAULT.getOtherOptions());
+
+        if (sstableConfig.compression == null || 
sstableConfig.compression.configurations == null)
+            
setDefaultSSTableCompressionConfigs(ImmutableMap.of(InheritingClass.DEFAULT_CONFIGURATION_KEY,
 defaultParameterizedClass));

Review Comment:
   The `applyConfig*` methods from my point of view mean that we apply the 
defaults to the Config class instance itself, and the use this initialized 
defaults, specifically the `SSTableCompressionConfig` throughout the code. I 
don't think we should store the intermediate calculations in local variables or 
anywhere outside of the Config.



##########
test/distributed/org/apache/cassandra/distributed/test/SSTableCompressionTest.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.Config.SSTableConfig;
+import org.apache.cassandra.config.InheritingClass;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.NoopCompressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.compress.ZstdCompressor;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.service.StorageService;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
+import static 
org.apache.cassandra.schema.SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES;
+import static 
org.apache.cassandra.schema.SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES;
+import static org.apache.commons.io.FileUtils.readFileToString;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableCompressionTest
+{
+    private Cluster cluster;
+
+    private static final String FAST_PARAM = "WITH compression = {'class': 
'SnappyCompressor'}";
+    private static final String SLOW_PARAM = "WITH compression = {'class': 
'DeflateCompressor'}";
+    private static final String DEFAULT_PARAM = "";
+
+    private static final String KEYSPACE = "sstable_compression_test";
+
+    private static final Consumer<IInstanceConfig> DEFAULT_CONFIG = c -> {
+        c.with(NATIVE_PROTOCOL, NETWORK, GOSSIP);
+        c.set("flush_compression", "fast");
+        SSTableConfig config = new SSTableConfig();
+        config.compression = new Config.SSTableCompressionConfig();
+        config.compression.configurations = new LinkedHashMap<>();
+        config.compression.configurations.put("lz4", new InheritingClass(null, 
LZ4Compressor.class.getName(), new HashMap<>()));
+        config.compression.configurations.put("default", new 
InheritingClass("lz4", null, null));
+        c.set("sstable", config);
+    };
+
+    private static final Consumer<IInstanceConfig> SLOW_CONFIG = c -> {
+        c.with(NATIVE_PROTOCOL, NETWORK, GOSSIP);
+        c.set("flush_compression", "fast");
+        SSTableConfig config = new SSTableConfig();
+        config.compression = new Config.SSTableCompressionConfig();
+        config.compression.configurations = new LinkedHashMap<>();
+        config.compression.configurations.put("deflate", new 
InheritingClass(null, DeflateCompressor.class.getName(), new HashMap<>()));
+        config.compression.configurations.put("default", new 
InheritingClass("deflate", null, null));
+        c.set("sstable", config);
+    };
+
+    private static final Consumer<IInstanceConfig> ZSTD_CONFIG = c -> {
+        c.with(NATIVE_PROTOCOL, NETWORK, GOSSIP);
+        c.set("flush_compression", "fast");
+        SSTableConfig config = new SSTableConfig();
+        config.compression = new Config.SSTableCompressionConfig();
+        config.compression.configurations = new LinkedHashMap<>();
+        config.compression.configurations.put("zstd", new 
InheritingClass(null, ZstdCompressor.class.getName(), new HashMap<>()));
+        config.compression.configurations.put("default", new 
InheritingClass("zstd", null, null));
+        c.set("sstable", config);
+    };
+
+    private static final Consumer<IInstanceConfig> INVALID_CONFIG = c -> {
+        c.with(NATIVE_PROTOCOL, NETWORK, GOSSIP);
+        c.set("flush_compression", "fast");
+        SSTableConfig config = new SSTableConfig();
+
+        config.compression = new Config.SSTableCompressionConfig();
+        config.compression.configurations = new LinkedHashMap<>();
+
+        Map<String, String> configMap = new HashMap<>();
+        configMap.put("chunk_length_in_kb", "32");
+        configMap.put("max_compressed_length", "64KiB"); // has to be equal to 
or lower than chunk_length
+
+        config.compression.configurations.put("zstd", new 
InheritingClass(null, ZstdCompressor.class.getName(), configMap));
+        config.compression.configurations.put("default", new 
InheritingClass("zstd", null, null));
+
+        c.set("sstable", config);
+    };
+
+    public Path setupCluster(Consumer<IInstanceConfig> config, Path root) 
throws IOException
+    {
+        Cluster.Builder builder = 
Cluster.build().withNodes(1).withConfig(config);
+
+        if (root != null)
+            builder.withRoot(root);
+
+        cluster = builder.start();
+
+        return builder.getRootPath();
+    }
+
+    @After
+    public void tearDownCluster()
+    {
+        if (cluster != null)
+        {
+            cluster.close();
+            cluster = null;
+        }
+    }
+
+    @Test
+    public void testCreation() throws IOException
+    {
+        setupCluster(DEFAULT_CONFIG, null);
+        cluster.schemaChange(format("CREATE KEYSPACE IF NOT EXISTS %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", 
KEYSPACE));
+        testDisabled();

Review Comment:
   Minor: These methods are used only once, it's easier and shorter to inline 
them just to avoid falling into the methods to figure out what exactly they are 
testing.



##########
src/java/org/apache/cassandra/schema/CompressionParams.java:
##########
@@ -20,184 +20,355 @@
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.InheritingClass;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.compress.*;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.NoopCompressor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Throwables;
 
 import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT;
 
 public final class CompressionParams
 {
-    public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16;
+    public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; // in KB
     public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0;        // 
Since pre-4.0 versions do not understand the
-                                                                        // new 
compression parameter we can't use a
-                                                                        // 
different default value.
+    // new compression parameter we can't use a
+    // different default value.
     public static final IVersionedSerializer<CompressionParams> serializer = 
new Serializer();
 
     public static final String CLASS = "class";
     public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb";
+
+    /**
+     * Requires a DataStorageSpec suffix
+     */
+    public static final String MAX_COMPRESSED_LENGTH = "max_compressed_length";
     public static final String ENABLED = "enabled";
     public static final String MIN_COMPRESS_RATIO = "min_compress_ratio";
 
-    public static final CompressionParams DEFAULT = 
!CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean()
-                                                    ? noCompression()
-                                                    : new 
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
-                                                                            
DEFAULT_CHUNK_LENGTH,
-                                                                            
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
-                                                                            
DEFAULT_MIN_COMPRESS_RATIO,
-                                                                            
Collections.emptyMap());
-
-    public static final CompressionParams NOOP = new 
CompressionParams(NoopCompressor.create(Collections.emptyMap()),
+    public static final CompressionParams NOOP = new 
CompressionParams(NoopCompressor.create(emptyMap()),
                                                                        // 4 
KiB is often the underlying disk block size
                                                                        1024 * 
4,
                                                                        
Integer.MAX_VALUE,
                                                                        
DEFAULT_MIN_COMPRESS_RATIO,
-                                                                       
Collections.emptyMap());
+                                                                       
emptyMap());
 
-    private final ICompressor sstableCompressor;
-    private final int chunkLength;
-    private final int maxCompressedLength;  // In content we store max length 
to avoid rounding errors causing compress/decompress mismatch.
-    private final double minCompressRatio;  // In configuration we store min 
ratio, the input parameter.
-    private final ImmutableMap<String, String> otherOptions; // Unrecognized 
options, can be used by the compressor
+    /**
+     * (legacy) Default for when no other compression is specified
+     */
+    public static final CompressionParams DEFAULT = new 
CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
+                                                                          
DEFAULT_CHUNK_LENGTH,
+                                                                          
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
+                                                                          
DEFAULT_MIN_COMPRESS_RATIO,
+                                                                          
emptyMap());
 
-    public static CompressionParams fromMap(Map<String, String> opts)
-    {
-        Map<String, String> options = copyOptions(opts);
+    /**
+     * A guaranteed FAST compressor
+     */
+    public static final CompressionParams FAST = DEFAULT;
 
-        String sstableCompressionClass;
+    /**
+     * The default calculated from cassandra.yaml
+     */
+    private static CompressionParams CALCULATED_DEFAULT;
 
-        if (!opts.isEmpty() && isEnabled(opts) && !options.containsKey(CLASS))
-            throw new ConfigurationException(format("Missing sub-option '%s' 
for the 'compression' option.", CLASS));
+    private final ICompressor sstableCompressor;
+    /**
+     * The chunk length in KB
+     */
+    private final int chunkLength;
+    /**
+     * The compressed length in KB.
+     * In content we store max length to avoid rounding errors causing 
compress/decompress mismatch.
+     */
+    private final int maxCompressedLength;
+    /**
+     * The minimum compression ratio.
+     * In configuration, we store min ratio, the input parameter.
+     * Ths is mathematically related to chunkLength and maxCompressedLength in 
that
+     * # chunk_length / max_compressed_length = min_compress_ratio
+     */
+    private final double minCompressRatio;
+    /**
+     * unrecognized options, may contain compressor specific options
+     */
+    private final ImmutableMap<String, String> otherOptions;
 
-        if (!removeEnabled(options) && !options.isEmpty())
-            throw new ConfigurationException(format("If the '%s' option is set 
to false no other options must be specified", ENABLED));
-        else
-            sstableCompressionClass = removeSSTableCompressionClass(options);
+    /**
+     * Gets the default compression params for the keyspace.
+     * This method accounts for issues when the compression params change for 
system keyspaces.
+     *
+     * @param keyspace the name of the keyspace to get params for. (may be 
null for non-System keyspaces)
+     * @return The compresson parameters for the keyspace.
+     */
+    public static CompressionParams defaultParams(String keyspace)
+    {
+        if (keyspace != null && 
SchemaConstants.getLocalAndReplicatedSystemKeyspaceNames().contains(keyspace))

Review Comment:
   If `keyspace == null` why should we return the `DEFAULT` instead of the 
defaults, that were set in the yaml?



##########
test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java:
##########
@@ -286,7 +286,14 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.utils.concurrent.SelfRefCounted",
     "org.apache.cassandra.utils.concurrent.Transactional",
     "org.apache.cassandra.utils.concurrent.UncheckedInterruptedException",
-    "org.apache.cassandra.utils.StorageCompatibilityMode"
+    "org.apache.cassandra.utils.StorageCompatibilityMode",

Review Comment:
   Please, sort the list `validClasses`



##########
src/java/org/apache/cassandra/io/sstable/format/DataComponent.java:
##########
@@ -86,12 +86,17 @@ private static CompressionParams 
buildCompressionParams(TableMetadata metadata,
                     compressionParams = CompressionParams.NOOP;
                     break;
                 case fast:
-                    if 
(!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
+                    // The default compressor is generally Fast, but just in 
case we verify it.
+                    CompressionParams defaultCompressionParams = 
CompressionParams.defaultParams(metadata.keyspace);
+                    ICompressor maybeFastCompressor = 
defaultCompressionParams.getSstableCompressor();
+                    if 
(maybeFastCompressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
 {
+                        compressionParams = defaultCompressionParams;
+                    }
+                    else
                     {
-                        // The default compressor is generally fast (LZ4 with 
16KiB block size)
-                        compressionParams = CompressionParams.DEFAULT;
-                        break;
+                        compressionParams = CompressionParams.FAST;

Review Comment:
   Found the following test, I think assertion would be better. 
   
   `org.apache.cassandra.io.compress.CQLCompressionTest#lz4FlushTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to