This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new 9bfaee91c4 Optionally fail writes when SAI refuses to index a term value exceeding a configured maximum size 9bfaee91c4 is described below commit 9bfaee91c4fd7a269e3ff924e8a504bad5d6514a Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Tue Apr 9 17:32:56 2024 -0500 Optionally fail writes when SAI refuses to index a term value exceeding a configured maximum size patch by Caleb Rackliffe; reviewed by Berenguer Blasi and Stefan Miklosovic for CASSANDRA-19493 --- CHANGES.txt | 1 + NEWS.txt | 1 + conf/cassandra.yaml | 12 + .../config/CassandraRelevantProperties.java | 3 - src/java/org/apache/cassandra/config/Config.java | 11 +- .../apache/cassandra/config/GuardrailsOptions.java | 84 +++++++ .../cassandra/cql3/statements/BatchStatement.java | 2 +- .../cql3/statements/BatchUpdatesCollector.java | 23 +- .../cassandra/cql3/statements/CQL3CasRequest.java | 2 +- .../cql3/statements/ModificationStatement.java | 2 +- .../statements/SingleTableUpdatesCollector.java | 6 +- .../cql3/statements/UpdatesCollector.java | 3 +- src/java/org/apache/cassandra/db/IMutation.java | 27 +- .../apache/cassandra/db/guardrails/Guardrails.java | 98 +++++++- .../cassandra/db/guardrails/GuardrailsConfig.java | 54 ++++ .../cassandra/db/guardrails/GuardrailsMBean.java | 72 ++++++ .../cassandra/db/partitions/PartitionUpdate.java | 5 +- .../cassandra/db/virtual/VirtualMutation.java | 3 +- src/java/org/apache/cassandra/index/Index.java | 5 +- .../org/apache/cassandra/index/IndexRegistry.java | 41 ++-- .../cassandra/index/SecondaryIndexManager.java | 7 +- .../cassandra/index/internal/CassandraIndex.java | 4 +- .../cassandra/index/sai/StorageAttachedIndex.java | 77 +++--- .../index/sai/disk/v1/SSTableIndexWriter.java | 2 +- .../index/sai/memory/TrieMemoryIndex.java | 2 +- .../index/sai/memory/VectorMemoryIndex.java | 2 +- .../org/apache/cassandra/index/sasi/SASIIndex.java | 4 +- .../paxos/uncommitted/PaxosUncommittedIndex.java | 4 +- .../cassandra/anttasks/TestNameCheckTask.java | 26 +- .../guardrails/GuardrailColumnValueSizeTest.java | 237 ++---------------- .../guardrails/GuardrailSaiFrozenTermSizeTest.java | 139 +++++++++++ .../guardrails/GuardrailSaiStringTermSizeTest.java | 215 ++++++++++++++++ .../guardrails/GuardrailSaiVectorTermSizeTest.java | 133 ++++++++++ .../db/guardrails/ValueThresholdTester.java | 273 +++++++++++++++++++++ .../unit/org/apache/cassandra/index/StubIndex.java | 4 +- .../index/internal/CustomCassandraIndex.java | 4 +- .../index/sai/cql/AllTypesSimpleEqTest.java | 9 +- .../index/sai/cql/StorageAttachedIndexDDLTest.java | 47 ---- 38 files changed, 1267 insertions(+), 377 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 74d142089c..09c5468db4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0-beta2 + * Optionally fail writes when SAI refuses to index a term value exceeding configured term max size (CASSANDRA-19493) * Vector search can restrict on clustering keys when filtering isn't required (CASSANDRA-19544) * Fix FBUtilities' parsing of gcp cos_containerd kernel versions (CASSANDRA-18594) * Clean up KeyRangeIterator classes (CASSANDRA-19428) diff --git a/NEWS.txt b/NEWS.txt index c074867069..1ba8f6639e 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -126,6 +126,7 @@ New features - Vector dimensions - Whether it is possible to execute secondary index queries without restricting on partition key - Warning and failure thresholds for maximum referenced SAI indexes on a replica when executing a SELECT query + - Warning and failure thresholds for the size of terms written to an SAI index - It is possible to list ephemeral snapshots by nodetool listsnaphots command when flag "-e" is specified. - Added a new flag to `nodetool profileload` and JMX endpoint to set up recurring profile load generation on specified intervals (see CASSANDRA-17821) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 961c607f93..99dd449c84 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -2152,6 +2152,18 @@ drop_compact_storage_enabled: false # before emitting a failure (defaults to -1 to disable) #sai_sstable_indexes_per_query_fail_threshold: -1 +# Guardrail specifying warn/fail thresholds for the size of string terms written to an SAI index +# sai_string_term_size_warn_threshold: 1KiB +# sai_string_term_size_fail_threshold: 8KiB + +# Guardrail specifying warn/fail thresholds for the size of frozen terms written to an SAI index +# sai_frozen_term_size_warn_threshold: 1KiB +# sai_frozen_term_size_fail_threshold: 8KiB + +# Guardrail specifying warn/fail thresholds for the size of vector terms written to an SAI index +# sai_vector_term_size_warn_threshold: 16KiB +# sai_vector_term_size_fail_threshold: 32KiB + # The default secondary index implementation when CREATE INDEX does not specify one via USING. # ex. "legacy_local_table" - (default) legacy secondary index, implemented as a hidden table # ex. "sai" - "storage-attched" index, implemented via optimized SSTable/Memtable-attached indexes diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index d36530f51f..e0dfeb19ff 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -425,9 +425,6 @@ public enum CassandraRelevantProperties SAI_INTERSECTION_CLAUSE_LIMIT("cassandra.sai.intersection_clause_limit", "2"), /** Latest version to be used for SAI index writing */ SAI_LATEST_VERSION("cassandra.sai.latest_version", "aa"), - SAI_MAX_FROZEN_TERM_SIZE("cassandra.sai.max_frozen_term_size", "5KiB"), - SAI_MAX_STRING_TERM_SIZE("cassandra.sai.max_string_term_size", "1KiB"), - SAI_MAX_VECTOR_TERM_SIZE("cassandra.sai.max_vector_term_size", "32KiB"), /** Minimum number of reachable leaves for a given node to be eligible for an auxiliary posting list */ SAI_MINIMUM_POSTINGS_LEAVES("cassandra.sai.minimum_postings_leaves", "64"), diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index aa0f3ee476..30da524777 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -903,11 +903,18 @@ public class Config public volatile boolean zero_ttl_on_twcs_warned = true; public volatile boolean zero_ttl_on_twcs_enabled = true; public volatile boolean non_partition_restricted_index_query_enabled = true; - public volatile int sai_sstable_indexes_per_query_warn_threshold = 32; - public volatile int sai_sstable_indexes_per_query_fail_threshold = -1; public volatile boolean intersect_filtering_query_warned = true; public volatile boolean intersect_filtering_query_enabled = true; + public volatile int sai_sstable_indexes_per_query_warn_threshold = 32; + public volatile int sai_sstable_indexes_per_query_fail_threshold = -1; + public volatile DataStorageSpec.LongBytesBound sai_string_term_size_warn_threshold = new DataStorageSpec.LongBytesBound("1KiB"); + public volatile DataStorageSpec.LongBytesBound sai_string_term_size_fail_threshold = new DataStorageSpec.LongBytesBound("8KiB"); + public volatile DataStorageSpec.LongBytesBound sai_frozen_term_size_warn_threshold = new DataStorageSpec.LongBytesBound("1KiB"); + public volatile DataStorageSpec.LongBytesBound sai_frozen_term_size_fail_threshold = new DataStorageSpec.LongBytesBound("8KiB"); + public volatile DataStorageSpec.LongBytesBound sai_vector_term_size_warn_threshold = new DataStorageSpec.LongBytesBound("16KiB"); + public volatile DataStorageSpec.LongBytesBound sai_vector_term_size_fail_threshold = new DataStorageSpec.LongBytesBound("32KiB"); + public volatile DurationSpec.LongNanosecondsBound streaming_state_expires = new DurationSpec.LongNanosecondsBound("3d"); public volatile DataStorageSpec.LongBytesBound streaming_state_size = new DataStorageSpec.LongBytesBound("40MiB"); diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java b/src/java/org/apache/cassandra/config/GuardrailsOptions.java index 8d7bd52197..504384e0d0 100644 --- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java +++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java @@ -960,6 +960,90 @@ public class GuardrailsOptions implements GuardrailsConfig x -> config.sai_sstable_indexes_per_query_fail_threshold = x); } + @Override + @Nullable + public DataStorageSpec.LongBytesBound getSaiStringTermSizeWarnThreshold() + { + return config.sai_string_term_size_warn_threshold; + } + + @Override + @Nullable + public DataStorageSpec.LongBytesBound getSaiStringTermSizeFailThreshold() + { + return config.sai_string_term_size_fail_threshold; + } + + @Override + public void setSaiStringTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail) + { + validateSizeThreshold(warn, fail, false, "sai_string_term_size"); + updatePropertyWithLogging("sai_string_term_size_warn_threshold", + warn, + () -> config.sai_string_term_size_warn_threshold, + x -> config.sai_string_term_size_warn_threshold = x); + updatePropertyWithLogging("sai_string_term_size_fail_threshold", + fail, + () -> config.sai_string_term_size_fail_threshold, + x -> config.sai_string_term_size_fail_threshold = x); + } + + @Override + @Nullable + public DataStorageSpec.LongBytesBound getSaiFrozenTermSizeWarnThreshold() + { + return config.sai_frozen_term_size_warn_threshold; + } + + @Override + @Nullable + public DataStorageSpec.LongBytesBound getSaiFrozenTermSizeFailThreshold() + { + return config.sai_frozen_term_size_fail_threshold; + } + + @Override + public void setSaiFrozenTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail) + { + validateSizeThreshold(warn, fail, false, "sai_frozen_term_size"); + updatePropertyWithLogging("sai_frozen_term_size_warn_threshold", + warn, + () -> config.sai_frozen_term_size_warn_threshold, + x -> config.sai_frozen_term_size_warn_threshold = x); + updatePropertyWithLogging("sai_frozen_term_size_fail_threshold", + fail, + () -> config.sai_frozen_term_size_fail_threshold, + x -> config.sai_frozen_term_size_fail_threshold = x); + } + + @Override + @Nullable + public DataStorageSpec.LongBytesBound getSaiVectorTermSizeWarnThreshold() + { + return config.sai_vector_term_size_warn_threshold; + } + + @Override + @Nullable + public DataStorageSpec.LongBytesBound getSaiVectorTermSizeFailThreshold() + { + return config.sai_vector_term_size_fail_threshold; + } + + @Override + public void setSaiVectorTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail) + { + validateSizeThreshold(warn, fail, false, "sai_vector_term_size"); + updatePropertyWithLogging("sai_vector_term_size_warn_threshold", + warn, + () -> config.sai_vector_term_size_warn_threshold, + x -> config.sai_vector_term_size_warn_threshold = x); + updatePropertyWithLogging("sai_vector_term_size_fail_threshold", + fail, + () -> config.sai_vector_term_size_fail_threshold, + x -> config.sai_vector_term_size_fail_threshold = x); + } + @Override public boolean getNonPartitionRestrictedQueryEnabled() { diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 9f5ac2ff26..eea09100b0 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -320,7 +320,7 @@ public class BatchStatement implements CQLStatement ClientWarn.instance.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs }) .getMessage()); } - return collector.toMutations(); + return collector.toMutations(state); } /** diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java index c346eb9671..521cd2afa6 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java @@ -18,19 +18,28 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.CounterMutation; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.commitlog.CommitLogSegment; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.virtual.VirtualMutation; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.service.ClientState; import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; @@ -122,9 +131,13 @@ final class BatchUpdatesCollector implements UpdatesCollector /** * Returns a collection containing all the mutations. + * + * @param state state related to the client connection + * * @return a collection containing all the mutations. */ - public List<IMutation> toMutations() + @Override + public List<IMutation> toMutations(ClientState state) { List<IMutation> ms = new ArrayList<>(); for (Map<ByteBuffer, IMutationBuilder> ksMap : mutationBuilders.values()) @@ -132,7 +145,7 @@ final class BatchUpdatesCollector implements UpdatesCollector for (IMutationBuilder builder : ksMap.values()) { IMutation mutation = builder.build(); - mutation.validateIndexedColumns(); + mutation.validateIndexedColumns(state); mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE); ms.add(mutation); } diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index 02721dd99d..9671592c16 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -243,7 +243,7 @@ public class CQL3CasRequest implements CASRequest upd.applyUpdates(current, updateBuilder, clientState); PartitionUpdate partitionUpdate = updateBuilder.build(); - IndexRegistry.obtain(metadata).validate(partitionUpdate); + IndexRegistry.obtain(metadata).validate(partitionUpdate, clientState); return partitionUpdate; } diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 1f8f3f037a..61fbd6f391 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -747,7 +747,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa HashMultiset<ByteBuffer> perPartitionKeyCounts = HashMultiset.create(keys); SingleTableUpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts); addUpdates(collector, keys, state, options, local, timestamp, nowInSeconds, queryStartNanoTime); - return collector.toMutations(); + return collector.toMutations(state); } final void addUpdates(UpdatesCollector collector, diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java index 14b1660764..5ff299eb88 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java @@ -36,6 +36,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.virtual.VirtualMutation; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientState; /** * Utility class to collect updates. @@ -92,7 +93,8 @@ final class SingleTableUpdatesCollector implements UpdatesCollector * Returns a collection containing all the mutations. * @return a collection containing all the mutations. */ - public List<IMutation> toMutations() + @Override + public List<IMutation> toMutations(ClientState state) { List<IMutation> ms = new ArrayList<>(puBuilders.size()); for (PartitionUpdate.Builder builder : puBuilders.values()) @@ -106,7 +108,7 @@ final class SingleTableUpdatesCollector implements UpdatesCollector else mutation = new Mutation(builder.build()); - mutation.validateIndexedColumns(); + mutation.validateIndexedColumns(state); mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE); ms.add(mutation); } diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java index c3dd334971..40b75ab5fa 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java @@ -25,9 +25,10 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientState; public interface UpdatesCollector { PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency); - List<IMutation> toMutations(); + List<IMutation> toMutations(ClientState state); } diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java index 801c9a9c8c..1998e2c035 100644 --- a/src/java/org/apache/cassandra/db/IMutation.java +++ b/src/java/org/apache/cassandra/db/IMutation.java @@ -24,24 +24,25 @@ import java.util.function.Supplier; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.ClientState; public interface IMutation { - public long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize(); + long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize(); - public void apply(); - public String getKeyspaceName(); - public Collection<TableId> getTableIds(); - public DecoratedKey key(); - public long getTimeout(TimeUnit unit); - public String toString(boolean shallow); - public Collection<PartitionUpdate> getPartitionUpdates(); - public Supplier<Mutation> hintOnFailure(); + void apply(); + String getKeyspaceName(); + Collection<TableId> getTableIds(); + DecoratedKey key(); + long getTimeout(TimeUnit unit); + String toString(boolean shallow); + Collection<PartitionUpdate> getPartitionUpdates(); + Supplier<Mutation> hintOnFailure(); - public default void validateIndexedColumns() + default void validateIndexedColumns(ClientState state) { for (PartitionUpdate pu : getPartitionUpdates()) - pu.validateIndexedColumns(); + pu.validateIndexedColumns(state); } /** @@ -52,14 +53,14 @@ public interface IMutation * @param overhead overhadd to add for mutation size to validate. Pass zero if not required but not a negative value. * @throws MutationExceededMaxSizeException if {@link DatabaseDescriptor#getMaxMutationSize()} is exceeded */ - public void validateSize(int version, int overhead); + void validateSize(int version, int overhead); /** * Computes the total data size of the specified mutations. * @param mutations the mutations * @return the total data size of the specified mutations */ - public static long dataSize(Collection<? extends IMutation> mutations) + static long dataSize(Collection<? extends IMutation> mutations) { long size = 0; for (IMutation mutation : mutations) diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java index 01157bf345..f61ae204d1 100644 --- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java +++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java @@ -359,7 +359,7 @@ public final class Guardrails implements GuardrailsMBean state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getColumnValueSizeWarnThreshold()), state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getColumnValueSizeFailThreshold()), (isWarning, what, value, threshold) -> - format("Value of column %s has size %s, this exceeds the %s threshold of %s.", + format("Value of column '%s' has size %s, this exceeds the %s threshold of %s.", what, value, isWarning ? "warning" : "failure", threshold)); /** @@ -501,6 +501,42 @@ public final class Guardrails implements GuardrailsMBean format("The number of SSTable indexes queried on index %s violated %s threshold value %s with value %s", what, isWarning ? "warning" : "failure", threshold, value))); + /** + * Guardrail on the size of a string term written to SAI index. + */ + public static final MaxThreshold saiStringTermSize = + new MaxThreshold("sai_string_term_size", + null, + state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiStringTermSizeWarnThreshold()), + state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiStringTermSizeFailThreshold()), + (isWarning, what, value, threshold) -> + format("Value of column '%s' has size %s, this exceeds the %s threshold of %s.", + what, value, isWarning ? "warning" : "failure", threshold)); + + /** + * Guardrail on the size of a frozen term written to SAI index. + */ + public static final MaxThreshold saiFrozenTermSize = + new MaxThreshold("sai_frozen_term_size", + null, + state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiFrozenTermSizeWarnThreshold()), + state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiFrozenTermSizeFailThreshold()), + (isWarning, what, value, threshold) -> + format("Value of column '%s' has size %s, this exceeds the %s threshold of %s.", + what, value, isWarning ? "warning" : "failure", threshold)); + + /** + * Guardrail on the size of a vector term written to SAI index. + */ + public static final MaxThreshold saiVectorTermSize = + new MaxThreshold("sai_vector_term_size", + null, + state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiVectorTermSizeWarnThreshold()), + state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiVectorTermSizeFailThreshold()), + (isWarning, what, value, threshold) -> + format("Value of column '%s' has size %s, this exceeds the %s threshold of %s.", + what, value, isWarning ? "warning" : "failure", threshold)); + public static final EnableFlag nonPartitionRestrictedIndexQueryEnabled = new EnableFlag("non_partition_restricted_index_query_enabled", "Executing a query on secondary indexes without partition key restriction might degrade performance", @@ -1248,6 +1284,66 @@ public final class Guardrails implements GuardrailsMBean DEFAULT_CONFIG.setSaiSSTableIndexesPerQueryThreshold(warn, fail); } + @Override + @Nullable + public String getSaiStringTermSizeWarnThreshold() + { + return sizeToString(DEFAULT_CONFIG.getSaiStringTermSizeWarnThreshold()); + } + + @Override + @Nullable + public String getSaiStringTermSizeFailThreshold() + { + return sizeToString(DEFAULT_CONFIG.getSaiStringTermSizeFailThreshold()); + } + + @Override + public void setSaiStringTermSizeThreshold(@Nullable String warnSize, @Nullable String failSize) + { + DEFAULT_CONFIG.setSaiStringTermSizeThreshold(sizeFromString(warnSize), sizeFromString(failSize)); + } + + @Override + @Nullable + public String getSaiFrozenTermSizeWarnThreshold() + { + return sizeToString(DEFAULT_CONFIG.getSaiFrozenTermSizeWarnThreshold()); + } + + @Override + @Nullable + public String getSaiFrozenTermSizeFailThreshold() + { + return sizeToString(DEFAULT_CONFIG.getSaiFrozenTermSizeFailThreshold()); + } + + @Override + public void setSaiFrozenTermSizeThreshold(@Nullable String warnSize, @Nullable String failSize) + { + DEFAULT_CONFIG.setSaiFrozenTermSizeThreshold(sizeFromString(warnSize), sizeFromString(failSize)); + } + + @Override + @Nullable + public String getSaiVectorTermSizeWarnThreshold() + { + return sizeToString(DEFAULT_CONFIG.getSaiVectorTermSizeWarnThreshold()); + } + + @Override + @Nullable + public String getSaiVectorTermSizeFailThreshold() + { + return sizeToString(DEFAULT_CONFIG.getSaiVectorTermSizeFailThreshold()); + } + + @Override + public void setSaiVectorTermSizeThreshold(@Nullable String warnSize, @Nullable String failSize) + { + DEFAULT_CONFIG.setSaiVectorTermSizeThreshold(sizeFromString(warnSize), sizeFromString(failSize)); + } + @Override public boolean getNonPartitionRestrictedQueryEnabled() { diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java index 5c6880fde5..508ae53fc0 100644 --- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java +++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java @@ -464,6 +464,60 @@ public interface GuardrailsConfig */ void setSaiSSTableIndexesPerQueryThreshold(int warn, int fail); + /** + * @return the warning threshold for the size of string terms written to an SAI index + */ + DataStorageSpec.LongBytesBound getSaiStringTermSizeWarnThreshold(); + + /** + * @return the failure threshold for the size of string terms written to an SAI index + */ + DataStorageSpec.LongBytesBound getSaiStringTermSizeFailThreshold(); + + /** + * Sets warning and failure thresholds for the size of string terms written to an SAI index + * + * @param warn value to set for warn threshold + * @param fail value to set for fail threshold + */ + void setSaiStringTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail); + + /** + * @return the warning threshold for the size of frozen terms written to an SAI index + */ + DataStorageSpec.LongBytesBound getSaiFrozenTermSizeWarnThreshold(); + + /** + * @return the failure threshold for the size of frozen terms written to an SAI index + */ + DataStorageSpec.LongBytesBound getSaiFrozenTermSizeFailThreshold(); + + /** + * Sets warning and failure thresholds for the size of frozen terms written to an SAI index + * + * @param warn value to set for warn threshold + * @param fail value to set for fail threshold + */ + void setSaiFrozenTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail); + + /** + * @return the warning threshold for the size of vector terms written to an SAI index + */ + DataStorageSpec.LongBytesBound getSaiVectorTermSizeWarnThreshold(); + + /** + * @return the failure threshold for the size of vector terms written to an SAI index + */ + DataStorageSpec.LongBytesBound getSaiVectorTermSizeFailThreshold(); + + /** + * Sets warning and failure thresholds for the size of vector terms written to an SAI index + * + * @param warn value to set for warn threshold + * @param fail value to set for fail threshold + */ + void setSaiVectorTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail); + /** * Returns whether it is possible to execute a query against secondary indexes without specifying * any partition key restrictions. diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java index 89fe7a70ba..9a3750bba8 100644 --- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java +++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java @@ -789,6 +789,78 @@ public interface GuardrailsMBean */ void setSaiSSTableIndexesPerQueryThreshold(int warn, int fail); + /** + * @return The warning threshold for string terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) A {@code null} value means disabled. + */ + @Nullable + String getSaiStringTermSizeWarnThreshold(); + + /** + * @return The failure threshold for string terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) A {@code null} value means disabled. + */ + @Nullable + String getSaiStringTermSizeFailThreshold(); + + /** + * @param warnSize The warning threshold for string terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) + * A {@code null} value means disabled. + * @param failSize The failure threshold for string terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) + * A {@code null} value means disabled. + */ + void setSaiStringTermSizeThreshold(@Nullable String warnSize, @Nullable String failSize); + + /** + * @return The warning threshold for frozen terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) A {@code null} value means disabled. + */ + @Nullable + String getSaiFrozenTermSizeWarnThreshold(); + + /** + * @return The failure threshold for frozen terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) A {@code null} value means disabled. + */ + @Nullable + String getSaiFrozenTermSizeFailThreshold(); + + /** + * @param warnSize The warning threshold for frozen terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) + * A {@code null} value means disabled. + * @param failSize The failure threshold for frozen terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) + * A {@code null} value means disabled. + */ + void setSaiFrozenTermSizeThreshold(@Nullable String warnSize, @Nullable String failSize); + + /** + * @return The warning threshold for vector terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) A {@code null} value means disabled. + */ + @Nullable + String getSaiVectorTermSizeWarnThreshold(); + + /** + * @return The failure threshold for vector terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) A {@code null} value means disabled. + */ + @Nullable + String getSaiVectorTermSizeFailThreshold(); + + /** + * @param warnSize The warning threshold for vector terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) + * A {@code null} value means disabled. + * @param failSize The failure threshold for vector terms written to an SAI index, as a human-readable string. + * (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}) + * A {@code null} value means disabled. + */ + void setSaiVectorTermSizeThreshold(@Nullable String warnSize, @Nullable String failSize); + /** * Returns whether it is possible to execute a query against secondary indexes without specifying * any partition key restrictions. diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index 1047fc0a70..035cb0edd6 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -44,6 +44,7 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.UpdateFunction; import org.apache.cassandra.utils.vint.VIntCoding; @@ -542,9 +543,9 @@ public class PartitionUpdate extends AbstractBTreePartition return new SimpleBuilders.PartitionUpdateBuilder(metadata, partitionKeyValues); } - public void validateIndexedColumns() + public void validateIndexedColumns(ClientState state) { - IndexRegistry.obtain(metadata()).validate(this); + IndexRegistry.obtain(metadata()).validate(this, state); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java index 3e26032383..8c3b5b4afd 100644 --- a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java +++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java @@ -30,6 +30,7 @@ import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.ClientState; /** * A specialised IMutation implementation for virtual keyspaces. @@ -113,7 +114,7 @@ public final class VirtualMutation implements IMutation } @Override - public void validateIndexedColumns() + public void validateIndexedColumns(ClientState state) { // no-op } diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index 18fed751af..8abc800e0f 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -66,6 +66,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientState; /** * Consisting of a top level Index interface and two sub-interfaces which handle read and write operations, @@ -498,9 +499,11 @@ public interface Index * will process it. The partition key as well as the clustering and * cell values for each row in the update may be checked by index * implementations + * * @param update PartitionUpdate containing the values to be validated by registered Index implementations + * @param state state related to the client connection */ - public void validate(PartitionUpdate update) throws InvalidRequestException; + public void validate(PartitionUpdate update, ClientState state) throws InvalidRequestException; /** * Returns the SSTable-attached {@link Component}s created by this index. diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java index 46e87f357f..d29bb11db4 100644 --- a/src/java/org/apache/cassandra/index/IndexRegistry.java +++ b/src/java/org/apache/cassandra/index/IndexRegistry.java @@ -47,6 +47,7 @@ import org.apache.cassandra.io.sstable.SSTableFlushObserver; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientState; /** * The collection of all Index instances for a base table. @@ -98,7 +99,7 @@ public interface IndexRegistry } @Override - public void validate(PartitionUpdate update) + public void validate(PartitionUpdate update, ClientState state) { } }; @@ -113,21 +114,25 @@ public interface IndexRegistry { final Index index = new Index() { + @Override public Callable<?> getInitializationTask() { return null; } + @Override public IndexMetadata getIndexMetadata() { return null; } + @Override public Callable<?> getMetadataReloadTask(IndexMetadata indexMetadata) { return null; } + @Override public void register(IndexRegistry registry) { } @@ -137,60 +142,72 @@ public interface IndexRegistry { } + @Override public Optional<ColumnFamilyStore> getBackingTable() { return Optional.empty(); } + @Override public Callable<?> getBlockingFlushTask() { return null; } + @Override public Callable<?> getInvalidateTask() { return null; } + @Override public Callable<?> getTruncateTask(long truncatedAt) { return null; } + @Override public boolean shouldBuildBlocking() { return false; } + @Override public boolean dependsOn(ColumnMetadata column) { return false; } + @Override public boolean supportsExpression(ColumnMetadata column, Operator operator) { return true; } + @Override public AbstractType<?> customExpressionValueType() { return BytesType.instance; } + @Override public RowFilter getPostIndexQueryFilter(RowFilter filter) { return null; } + @Override public long getEstimatedResultRows() { return 0; } - public void validate(PartitionUpdate update) throws InvalidRequestException + @Override + public void validate(PartitionUpdate update, ClientState state) throws InvalidRequestException { } + @Override public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns columns, long nowInSec, WriteContext ctx, IndexTransaction.Type transactionType, Memtable memtable) { return null; @@ -210,16 +227,6 @@ public interface IndexRegistry return Collections.singleton(index); } - @Override - public void addIndex(Index index) - { - } - - @Override - public void removeIndex(Index index) - { - } - @Override public boolean containsIndex(Index i) { @@ -254,6 +261,7 @@ public interface IndexRegistry } }; + @Override public void registerIndex(Index index, Index.Group.Key groupKey, Supplier<Index.Group> groupSupplier) { } @@ -263,11 +271,13 @@ public interface IndexRegistry { } + @Override public Index getIndex(IndexMetadata indexMetadata) { return index; } + @Override public Collection<Index> listIndexes() { return Collections.singletonList(index); @@ -279,12 +289,14 @@ public interface IndexRegistry return Collections.singletonList(group); } + @Override public Optional<Index> getBestIndexFor(RowFilter.Expression expression) { return Optional.empty(); } - public void validate(PartitionUpdate update) + @Override + public void validate(PartitionUpdate update, ClientState state) { } }; @@ -313,8 +325,9 @@ public interface IndexRegistry * implementations * * @param update PartitionUpdate containing the values to be validated by registered Index implementations + * @param state state related to the client connection */ - void validate(PartitionUpdate update); + void validate(PartitionUpdate update, ClientState state); /** * Returns the {@code IndexRegistry} associated to the specified table. diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index da15282b39..ebd2cc0379 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -75,6 +75,7 @@ import org.apache.cassandra.notifications.SSTableAddedNotification; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.pager.SinglePartitionPager; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.ProtocolVersion; @@ -1295,11 +1296,13 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum * implementations * * @param update PartitionUpdate containing the values to be validated by registered Index implementations + * @param state state related to the client connection */ - public void validate(PartitionUpdate update) throws InvalidRequestException + @Override + public void validate(PartitionUpdate update, ClientState state) throws InvalidRequestException { for (Index index : indexes.values()) - index.validate(update); + index.validate(update, state); } /* diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 125c6e9dbd..1f39ea502b 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -57,6 +57,7 @@ import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Refs; @@ -308,7 +309,8 @@ public abstract class CassandraIndex implements Index } - public void validate(PartitionUpdate update) throws InvalidRequestException + @Override + public void validate(PartitionUpdate update, ClientState state) throws InvalidRequestException { switch (indexedColumn.kind) { diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java index d32176a9aa..b42a165f11 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java @@ -42,8 +42,7 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.FutureCombiner; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +63,9 @@ import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.WriteContext; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.guardrails.GuardrailViolatedException; +import org.apache.cassandra.db.guardrails.Guardrails; +import org.apache.cassandra.db.guardrails.MaxThreshold; import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.FloatType; @@ -102,17 +104,17 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.FutureCombiner; import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.apache.cassandra.utils.concurrent.OpOrder; -import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_FROZEN_TERM_SIZE; -import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_STRING_TERM_SIZE; -import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_VECTOR_TERM_SIZE; import static org.apache.cassandra.index.sai.disk.v1.IndexWriterConfig.MAX_TOP_K; public class StorageAttachedIndex implements Index @@ -142,11 +144,7 @@ public class StorageAttachedIndex implements Index private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); - public static final long MAX_STRING_TERM_SIZE = SAI_MAX_STRING_TERM_SIZE.getSizeInBytes(); - public static final long MAX_FROZEN_TERM_SIZE = SAI_MAX_FROZEN_TERM_SIZE.getSizeInBytes(); - public static final long MAX_VECTOR_TERM_SIZE = SAI_MAX_VECTOR_TERM_SIZE.getSizeInBytes(); - public static final String TERM_OVERSIZE_MESSAGE = "Can't add term of column %s to index for key: %s, term size %s " + - "max allowed size %s, use analyzed = true (if not yet set) for that column."; + public static final String TERM_OVERSIZE_MESSAGE = "Term in column '%s' for key '%s' is too large and cannot be indexed. (term size: %s)"; // Used to build indexes on newly added SSTables: private static final StorageAttachedIndexBuildingSupport INDEX_BUILDER_SUPPORT = new StorageAttachedIndexBuildingSupport(); @@ -182,7 +180,7 @@ public class StorageAttachedIndex implements Index private final PrimaryKey.Factory primaryKeyFactory; private final MemtableIndexManager memtableIndexManager; private final IndexMetrics indexMetrics; - private final long maxTermSize; + private final MaxThreshold maxTermSizeGuardrail; // Tracks whether we've started the index build on initialization. private volatile boolean initBuildStarted = false; @@ -206,8 +204,10 @@ public class StorageAttachedIndex implements Index analyzerFactory = AbstractAnalyzer.fromOptions(indexTermType, indexMetadata.options); memtableIndexManager = new MemtableIndexManager(this); indexMetrics = new IndexMetrics(this, memtableIndexManager); - maxTermSize = indexTermType.isVector() ? MAX_VECTOR_TERM_SIZE - : (indexTermType.isFrozen() ? MAX_FROZEN_TERM_SIZE : MAX_STRING_TERM_SIZE); + maxTermSizeGuardrail = indexTermType.isVector() + ? Guardrails.saiVectorTermSize + : (indexTermType.isFrozen() ? Guardrails.saiFrozenTermSize + : Guardrails.saiStringTermSize); } /** @@ -497,11 +497,15 @@ public class StorageAttachedIndex implements Index } @Override - public void validate(PartitionUpdate update) throws InvalidRequestException + public void validate(PartitionUpdate update, ClientState state) throws InvalidRequestException { DecoratedKey key = update.partitionKey(); - for (Row row : update) - validateMaxTermSizeForRow(key, row, true); + + if (indexTermType.columnMetadata().isStatic()) + validateTermSizeForRow(key, update.staticRow(), true, state); + else + for (Row row : update) + validateTermSizeForRow(key, row, true, state); } @Override @@ -730,60 +734,62 @@ public class StorageAttachedIndex implements Index /** * Validate maximum term size for given row */ - public void validateMaxTermSizeForRow(DecoratedKey key, Row row, boolean sendClientWarning) + public void validateTermSizeForRow(DecoratedKey key, Row row, boolean isClientMutation, ClientState state) { AbstractAnalyzer analyzer = hasAnalyzer() ? analyzer() : null; if (indexTermType.isNonFrozenCollection()) { Iterator<ByteBuffer> bufferIterator = indexTermType.valuesOf(row, FBUtilities.nowInSeconds()); while (bufferIterator != null && bufferIterator.hasNext()) - validateMaxTermSizeForCell(analyzer, key, bufferIterator.next(), sendClientWarning); + validateTermSizeForCell(analyzer, key, bufferIterator.next(), isClientMutation, state); } else { ByteBuffer value = indexTermType.valueOf(key, row, FBUtilities.nowInSeconds()); - validateMaxTermSizeForCell(analyzer, key, value, sendClientWarning); + validateTermSizeForCell(analyzer, key, value, isClientMutation, state); } } - private void validateMaxTermSizeForCell(AbstractAnalyzer analyzer, DecoratedKey key, @Nullable ByteBuffer cellBuffer, boolean sendClientWarning) + private void validateTermSizeForCell(AbstractAnalyzer analyzer, DecoratedKey key, @Nullable ByteBuffer cellBuffer, boolean isClientMutation, ClientState state) { if (cellBuffer == null || cellBuffer.remaining() == 0) return; // analyzer should not return terms that are larger than the origin value. - if (cellBuffer.remaining() <= maxTermSize) + if (!maxTermSizeGuardrail.warnsOn(cellBuffer.remaining(), null)) return; if (analyzer != null) { analyzer.reset(cellBuffer.duplicate()); while (analyzer.hasNext()) - validateMaxTermSize(key, analyzer.next(), sendClientWarning); + validateTermSize(key, analyzer.next(), isClientMutation, state); } else { - validateMaxTermSize(key, cellBuffer.duplicate(), sendClientWarning); + validateTermSize(key, cellBuffer.duplicate(), isClientMutation, state); } } /** - * Validate maximum term size for given term - * @return true if given term is valid; otherwise false. + * @return true if the size of the given term is below the maximum term size, false otherwise + * + * @throws GuardrailViolatedException if a client mutation contains a term that breaches the failure threshold */ - public boolean validateMaxTermSize(DecoratedKey key, ByteBuffer term, boolean sendClientWarning) + public boolean validateTermSize(DecoratedKey key, ByteBuffer term, boolean isClientMutation, ClientState state) { - if (term.remaining() > maxTermSize) + if (isClientMutation) { - String message = indexIdentifier.logMessage(String.format(TERM_OVERSIZE_MESSAGE, - indexTermType.columnName(), - key, - FBUtilities.prettyPrintMemory(term.remaining()), - FBUtilities.prettyPrintMemory(maxTermSize))); - - if (sendClientWarning) - ClientWarn.instance.warn(message); + maxTermSizeGuardrail.guard(term.remaining(), indexTermType.columnName(), false, state); + return true; + } + if (maxTermSizeGuardrail.failsOn(term.remaining(), state)) + { + String message = indexIdentifier.logMessage(String.format(TERM_OVERSIZE_MESSAGE, + indexTermType.columnName(), + key, + FBUtilities.prettyPrintMemory(term.remaining()))); noSpamLogger.warn(message); return false; } @@ -869,6 +875,7 @@ public class StorageAttachedIndex implements Index return FutureCombiner.allOf(futures); } + @SuppressWarnings("SameReturnValue") private Future<?> startPreJoinTask() { try diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index 06bb7d80fe..58ee69a215 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -187,7 +187,7 @@ public class SSTableIndexWriter implements PerColumnIndexWriter private void addTerm(ByteBuffer term, PrimaryKey key, long sstableRowId) throws IOException { - if (!index.validateMaxTermSize(key.partitionKey(), term, false)) + if (!index.validateTermSize(key.partitionKey(), term, false, null)) return; if (currentBuilder == null) diff --git a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java index 9997616d8b..c8d32a8386 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java @@ -204,7 +204,7 @@ public class TrieMemoryIndex extends MemoryIndex private void addTerm(PrimaryKey primaryKey, ByteBuffer term) { - if (index.validateMaxTermSize(primaryKey.partitionKey(), term, false)) + if (index.validateTermSize(primaryKey.partitionKey(), term, false, null)) { setMinMaxTerm(term.duplicate()); diff --git a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java index ad94f4475d..bea5cb877f 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java @@ -77,7 +77,7 @@ public class VectorMemoryIndex extends MemoryIndex @Override public synchronized long add(DecoratedKey key, Clustering<?> clustering, ByteBuffer value) { - if (value == null || value.remaining() == 0 || !index.validateMaxTermSize(key, value, false)) + if (value == null || value.remaining() == 0 || !index.validateTermSize(key, value, false, null)) return 0; var primaryKey = index.hasClustering() ? index.keyFactory().create(key, clustering) diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java index 93448f9e78..ccfed7f5c9 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java @@ -76,6 +76,7 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -266,7 +267,8 @@ public class SASIIndex implements Index, INotificationConsumer return Long.MIN_VALUE; } - public void validate(PartitionUpdate update) throws InvalidRequestException + @Override + public void validate(PartitionUpdate update, ClientState state) throws InvalidRequestException {} @Override diff --git a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedIndex.java b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedIndex.java index bfb73dba05..5348087279 100644 --- a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedIndex.java +++ b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedIndex.java @@ -62,6 +62,7 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.utils.CloseableIterator; import static java.util.Collections.singletonList; @@ -238,7 +239,8 @@ public class PaxosUncommittedIndex implements Index, PaxosUncommittedTracker.Upd return 0; } - public void validate(PartitionUpdate update) throws InvalidRequestException + @Override + public void validate(PartitionUpdate update, ClientState state) throws InvalidRequestException { } diff --git a/test/anttasks/org/apache/cassandra/anttasks/TestNameCheckTask.java b/test/anttasks/org/apache/cassandra/anttasks/TestNameCheckTask.java index 29b2f767e3..b1c17ad2c3 100644 --- a/test/anttasks/org/apache/cassandra/anttasks/TestNameCheckTask.java +++ b/test/anttasks/org/apache/cassandra/anttasks/TestNameCheckTask.java @@ -24,7 +24,10 @@ import java.lang.reflect.Modifier; import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Paths; +import java.util.ArrayDeque; import java.util.Arrays; +import java.util.Deque; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -158,14 +161,21 @@ public class TestNameCheckTask */ private Stream<? extends Class<?>> expand(Class<?> klass, Reflections reflections) { - Set<? extends Class<?>> subTypes = reflections.getSubTypesOf(klass); - if (subTypes == null || subTypes.isEmpty()) - return Stream.of(klass); - Stream<? extends Class<?>> subs = subTypes.stream(); - // assume we include if not abstract - if (!Modifier.isAbstract(klass.getModifiers())) - subs = Stream.concat(Stream.of(klass), subs); - return subs; + Set<Class<?>> concreteTypes = new HashSet<>(); + Deque<Class<?>> typeStack = new ArrayDeque<>(); + typeStack.push(klass); + + while (!typeStack.isEmpty()) + { + Class<?> type = typeStack.pop(); + + if (!Modifier.isAbstract(type.getModifiers())) + concreteTypes.add(type); + + reflections.getSubTypesOf(type).forEach(typeStack::push); + } + + return concreteTypes.stream(); } public static void main(String[] args) diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java b/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java index f0513daa9c..7827508dd4 100644 --- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java +++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java @@ -18,21 +18,11 @@ package org.apache.cassandra.db.guardrails; -import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.function.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import org.junit.Test; import org.apache.cassandra.config.DataStorageSpec; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.marshal.ListType; -import org.apache.cassandra.db.marshal.MapType; -import org.apache.cassandra.db.marshal.SetType; import static java.lang.String.format; import static java.nio.ByteBuffer.allocate; @@ -41,7 +31,7 @@ import static org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.BYTES; /** * Tests the guardrail for the size of column values, {@link Guardrails#columnValueSize}. */ -public class GuardrailColumnValueSizeTest extends ThresholdTester +public class GuardrailColumnValueSizeTest extends ValueThresholdTester { private static final int WARN_THRESHOLD = 1024; // bytes private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes @@ -58,6 +48,18 @@ public class GuardrailColumnValueSizeTest extends ThresholdTester size -> new DataStorageSpec.LongBytesBound(size).toBytes()); } + @Override + protected int warnThreshold() + { + return WARN_THRESHOLD; + } + + @Override + protected int failThreshold() + { + return FAIL_THRESHOLD; + } + @Test public void testSimplePartitionKey() throws Throwable { @@ -386,10 +388,10 @@ public class GuardrailColumnValueSizeTest extends ThresholdTester { createTable("CREATE TABLE %s (k text, c text, v text, s text STATIC, PRIMARY KEY(k, c))"); - // partition key, the CAS updates with values beyond the threshold are not applied so they don't come to fail + // partition key, the CAS updates with values beyond the threshold are not applied, so they don't come to fail testNoThreshold("UPDATE %s SET v = '0' WHERE k = ? AND c = '0' IF EXISTS"); - // clustering key, the CAS updates with values beyond the threshold are not applied so they don't come to fail + // clustering key, the CAS updates with values beyond the threshold are not applied, so they don't come to fail testNoThreshold("UPDATE %s SET v = '0' WHERE k = '0' AND c = ? IF EXISTS"); // static column, only the applied CAS updates can fire the guardrail @@ -442,213 +444,4 @@ public class GuardrailColumnValueSizeTest extends ThresholdTester testNoThreshold("SELECT * FROM %s WHERE s = ? ALLOW FILTERING"); testNoThreshold("SELECT * FROM %s WHERE r = ? ALLOW FILTERING"); } - - /** - * Tests that the max column size guardrail threshold is not applied for the specified 1-placeholder CQL query. - * - * @param query a CQL modification statement with exactly one placeholder - */ - private void testNoThreshold(String query) throws Throwable - { - assertValid(query, allocate(1)); - - assertValid(query, allocate(WARN_THRESHOLD)); - assertValid(query, allocate(WARN_THRESHOLD + 1)); - - assertValid(query, allocate(FAIL_THRESHOLD)); - assertValid(query, allocate(FAIL_THRESHOLD + 1)); - } - - /** - * Tests that the max column size guardrail threshold is not applied for the specified 2-placeholder CQL query. - * - * @param query a CQL modification statement with exactly two placeholders - */ - private void testNoThreshold2(String query) throws Throwable - { - assertValid(query, allocate(1), allocate(1)); - - assertValid(query, allocate(WARN_THRESHOLD), allocate(1)); - assertValid(query, allocate(1), allocate(WARN_THRESHOLD)); - assertValid(query, allocate((WARN_THRESHOLD)), allocate((WARN_THRESHOLD))); - assertValid(query, allocate(WARN_THRESHOLD + 1), allocate(1)); - assertValid(query, allocate(1), allocate(WARN_THRESHOLD + 1)); - - assertValid(query, allocate(FAIL_THRESHOLD), allocate(1)); - assertValid(query, allocate(1), allocate(FAIL_THRESHOLD)); - assertValid(query, allocate((FAIL_THRESHOLD)), allocate((FAIL_THRESHOLD))); - assertValid(query, allocate(FAIL_THRESHOLD + 1), allocate(1)); - assertValid(query, allocate(1), allocate(FAIL_THRESHOLD + 1)); - } - - /** - * Tests that the max column size guardrail threshold is applied for the specified 1-placeholder CQL query. - * - * @param column the name of the column referenced by the query placeholder - * @param query a CQL query with exactly one placeholder - */ - private void testThreshold(String column, String query) throws Throwable - { - testThreshold(column, query, 0); - } - - /** - * Tests that the max column size guardrail threshold is applied for the specified 1-placeholder CQL query. - * - * @param column the name of the column referenced by the query placeholder - * @param query a CQL query with exactly one placeholder - * @param serializationBytes the extra bytes added to the placeholder value by its wrapping column type serializer - */ - private void testThreshold(String column, String query, int serializationBytes) throws Throwable - { - int warn = WARN_THRESHOLD - serializationBytes; - int fail = FAIL_THRESHOLD - serializationBytes; - - assertValid(query, allocate(0)); - assertValid(query, allocate(warn)); - assertWarns(column, query, allocate(warn + 1)); - assertFails(column, query, allocate(fail + 1)); - } - - /** - * Tests that the max column size guardrail threshold is applied for the specified 2-placeholder CQL query. - * - * @param column the name of the column referenced by the placeholders - * @param query a CQL query with exactly two placeholders - */ - private void testThreshold2(String column, String query) throws Throwable - { - testThreshold2(column, query, 0); - } - - /** - * Tests that the max column size guardrail threshold is applied for the specified 2-placeholder query. - * - * @param column the name of the column referenced by the placeholders - * @param query a CQL query with exactly two placeholders - * @param serializationBytes the extra bytes added to the size of the placeholder value by their wrapping serializer - */ - private void testThreshold2(String column, String query, int serializationBytes) throws Throwable - { - int warn = WARN_THRESHOLD - serializationBytes; - int fail = FAIL_THRESHOLD - serializationBytes; - - assertValid(query, allocate(0), allocate(0)); - assertValid(query, allocate(warn), allocate(0)); - assertValid(query, allocate(0), allocate(warn)); - assertValid(query, allocate(warn / 2), allocate(warn / 2)); - - assertWarns(column, query, allocate(warn + 1), allocate(0)); - assertWarns(column, query, allocate(0), allocate(warn + 1)); - - assertFails(column, query, allocate(fail + 1), allocate(0)); - assertFails(column, query, allocate(0), allocate(fail + 1)); - } - - private void testCollection(String column, String query, Function<ByteBuffer[], ByteBuffer> collectionBuilder) throws Throwable - { - assertValid(query, collectionBuilder, allocate(1)); - assertValid(query, collectionBuilder, allocate(1), allocate(1)); - assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD)); - assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD), allocate(1)); - assertValid(query, collectionBuilder, allocate(1), allocate(WARN_THRESHOLD)); - assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)); - - assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD + 1)); - assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD + 1), allocate(1)); - assertWarns(column, query, collectionBuilder, allocate(1), allocate(WARN_THRESHOLD + 1)); - - assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD + 1)); - assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD + 1), allocate(1)); - assertFails(column, query, collectionBuilder, allocate(1), allocate(FAIL_THRESHOLD + 1)); - } - - private void testFrozenCollection(String column, String query, Function<ByteBuffer[], ByteBuffer> collectionBuilder) throws Throwable - { - assertValid(query, collectionBuilder, allocate(1)); - assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD - 8)); - assertValid(query, collectionBuilder, allocate((WARN_THRESHOLD - 12) / 2), allocate((WARN_THRESHOLD - 12) / 2)); - - assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD - 7)); - assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD - 12), allocate(1)); - - assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD - 7)); - assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD - 12), allocate(1)); - } - - private void testMap(String column, String query) throws Throwable - { - assertValid(query, this::map, allocate(1), allocate(1)); - assertValid(query, this::map, allocate(WARN_THRESHOLD), allocate(1)); - assertValid(query, this::map, allocate(1), allocate(WARN_THRESHOLD)); - assertValid(query, this::map, allocate(WARN_THRESHOLD), allocate(WARN_THRESHOLD)); - - assertWarns(column, query, this::map, allocate(1), allocate(WARN_THRESHOLD + 1)); - assertWarns(column, query, this::map, allocate(WARN_THRESHOLD + 1), allocate(1)); - - assertFails(column, query, this::map, allocate(FAIL_THRESHOLD + 1), allocate(1)); - assertFails(column, query, this::map, allocate(1), allocate(FAIL_THRESHOLD + 1)); - assertFails(column, query, this::map, allocate(FAIL_THRESHOLD + 1), allocate(FAIL_THRESHOLD + 1)); - } - - private void assertValid(String query, ByteBuffer... values) throws Throwable - { - assertValid(() -> execute(query, values)); - } - - private void assertValid(String query, Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) throws Throwable - { - assertValid(() -> execute(query, collectionBuilder.apply(values))); - } - - private void assertWarns(String column, String query, Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) throws Throwable - { - assertWarns(column, query, collectionBuilder.apply(values)); - } - - private void assertWarns(String column, String query, ByteBuffer... values) throws Throwable - { - String errorMessage = format("Value of column %s has size %s, this exceeds the warning threshold of %s.", - column, WARN_THRESHOLD + 1, WARN_THRESHOLD); - assertWarns(() -> execute(query, values), errorMessage); - } - - private void assertFails(String column, String query, Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) throws Throwable - { - assertFails(column, query, collectionBuilder.apply(values)); - } - - private void assertFails(String column, String query, ByteBuffer... values) throws Throwable - { - String errorMessage = format("Value of column %s has size %s, this exceeds the failure threshold of %s.", - column, FAIL_THRESHOLD + 1, FAIL_THRESHOLD); - assertFails(() -> execute(query, values), errorMessage); - } - - private void execute(String query, ByteBuffer... values) - { - execute(userClientState, query, Arrays.asList(values)); - } - - private ByteBuffer set(ByteBuffer... values) - { - return SetType.getInstance(BytesType.instance, true).decompose(ImmutableSet.copyOf(values)); - } - - private ByteBuffer list(ByteBuffer... values) - { - return ListType.getInstance(BytesType.instance, true).decompose(ImmutableList.copyOf(values)); - } - - private ByteBuffer map(ByteBuffer... values) - { - assert values.length % 2 == 0; - - int size = values.length / 2; - Map<ByteBuffer, ByteBuffer> m = new LinkedHashMap<>(size); - for (int i = 0; i < size; i++) - m.put(values[2 * i], values[(2 * i) + 1]); - - return MapType.getInstance(BytesType.instance, BytesType.instance, true).decompose(m); - } } diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiFrozenTermSizeTest.java b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiFrozenTermSizeTest.java new file mode 100644 index 0000000000..50cdb2ab58 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiFrozenTermSizeTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.guardrails; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.junit.Test; + +import org.apache.cassandra.config.DataStorageSpec; +import org.apache.cassandra.transport.messages.ResultMessage; + +import static java.lang.String.format; +import static org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.BYTES; +import static org.junit.Assert.assertEquals; + +/** + * Tests the guardrails around the size of SAI frozen terms + * + * @see Guardrails#saiFrozenTermSize + */ +public class GuardrailSaiFrozenTermSizeTest extends ValueThresholdTester +{ + private static final int WARN_THRESHOLD = 2048; // bytes + private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes + + public GuardrailSaiFrozenTermSizeTest() + { + super(WARN_THRESHOLD + "B", + FAIL_THRESHOLD + "B", + Guardrails.saiFrozenTermSize, + Guardrails::setSaiFrozenTermSizeThreshold, + Guardrails::getSaiFrozenTermSizeWarnThreshold, + Guardrails::getSaiFrozenTermSizeFailThreshold, + bytes -> new DataStorageSpec.LongBytesBound(bytes, BYTES).toString(), + size -> new DataStorageSpec.LongBytesBound(size).toBytes()); + } + + @Override + protected int warnThreshold() + { + return WARN_THRESHOLD; + } + + @Override + protected int failThreshold() + { + return FAIL_THRESHOLD; + } + + @Test + public void testTuple() throws Throwable + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, t tuple<text, text>)"); + createIndex("CREATE INDEX ON %s (t) USING 'sai'"); + + testThreshold2("t", "INSERT INTO %s (k, t) VALUES (0, (?, ?))", 8); + testThreshold2("t", "UPDATE %s SET t = (?, ?) WHERE k = 0", 8); + } + + @Test + public void testFrozenUDT() throws Throwable + { + String udt = createType("CREATE TYPE %s (a text, b text)"); + createTable(format("CREATE TABLE %%s (k int PRIMARY KEY, v frozen<%s>)", udt)); + createIndex("CREATE INDEX ON %s (v) USING 'sai'"); + + testThreshold("v", "INSERT INTO %s (k, v) VALUES (0, {a: ?})", 8); + testThreshold("v", "INSERT INTO %s (k, v) VALUES (0, {b: ?})", 8); + testThreshold("v", "UPDATE %s SET v = {a: ?} WHERE k = 0", 8); + testThreshold("v", "UPDATE %s SET v = {b: ?} WHERE k = 0", 8); + testThreshold2("v", "INSERT INTO %s (k, v) VALUES (0, {a: ?, b: ?})", 8); + testThreshold2("v", "UPDATE %s SET v = {a: ?, b: ?} WHERE k = 0", 8); + } + + @Test + public void testFrozenList() throws Throwable + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, fl frozen<list<text>>)"); + createIndex("CREATE INDEX ON %s (FULL(fl)) USING 'sai'"); + + // the serialized size of a frozen list is the size of its serialized elements, plus a 32-bit integer prefix for + // the number of elements, and another 32-bit integer for the size of each element + + for (String query : Arrays.asList("INSERT INTO %s (k, fl) VALUES (0, ?)", + "UPDATE %s SET fl = ? WHERE k = 0")) + { + testFrozenCollection("fl", query, this::list); + } + } + + @Test + public void testWarningTupleOnBuild() + { + ByteBuffer largeTuple = ByteBuffer.allocate(warnThreshold() + 1); + ByteBuffer smallTuple = ByteBuffer.allocate(1); + + createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, t tuple<text>)"); + execute("INSERT INTO %s (k, t) VALUES (0, (?))", largeTuple); + execute("INSERT INTO %s (k, t) VALUES (1, (?))", smallTuple); + createIndex("CREATE INDEX ON %s(t) USING 'sai'"); + + // verify that the large tuple is written on initial index build + assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE t = (?)", largeTuple)).result.size(), 1); + assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE t = (?)", smallTuple)).result.size(), 1); + } + + @Test + public void testFailingTupleOnBuild() + { + ByteBuffer oversizedTuple = ByteBuffer.allocate(failThreshold() + 1); + ByteBuffer smallTuple = ByteBuffer.allocate(1); + + createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, t tuple<text>)"); + execute("INSERT INTO %s (k, t) VALUES (0, (?))", oversizedTuple); + execute("INSERT INTO %s (k, t) VALUES (1, (?))", smallTuple); + createIndex("CREATE INDEX ON %s(t) USING 'sai'"); + + // verify that the oversized tuple isn't written on initial index build + assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE t = (?)", oversizedTuple)).result.size(), 0); + assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE t = (?)", smallTuple)).result.size(), 1); + } +} diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiStringTermSizeTest.java b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiStringTermSizeTest.java new file mode 100644 index 0000000000..868d736fad --- /dev/null +++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiStringTermSizeTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.guardrails; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.junit.Test; + +import org.apache.cassandra.config.DataStorageSpec; +import org.apache.cassandra.transport.messages.ResultMessage; + +import static java.nio.ByteBuffer.allocate; +import static org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.BYTES; +import static org.junit.Assert.assertEquals; + +/** + * Tests the guardrails around the size of SAI string terms + * + * @see Guardrails#saiStringTermSize + */ +public class GuardrailSaiStringTermSizeTest extends ValueThresholdTester +{ + private static final int WARN_THRESHOLD = 1024; // bytes + private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes + + public GuardrailSaiStringTermSizeTest() + { + super(WARN_THRESHOLD + "B", + FAIL_THRESHOLD + "B", + Guardrails.saiStringTermSize, + Guardrails::setSaiStringTermSizeThreshold, + Guardrails::getSaiStringTermSizeWarnThreshold, + Guardrails::getSaiStringTermSizeFailThreshold, + bytes -> new DataStorageSpec.LongBytesBound(bytes, BYTES).toString(), + size -> new DataStorageSpec.LongBytesBound(size).toBytes()); + } + + @Override + protected int warnThreshold() + { + return WARN_THRESHOLD; + } + + @Override + protected int failThreshold() + { + return FAIL_THRESHOLD; + } + + @Test + public void testCompositePartitionKey() throws Throwable + { + createTable("CREATE TABLE %s (k1 int, k2 text, v int, PRIMARY KEY((k1, k2)))"); + createIndex("CREATE INDEX ON %s (k2) USING 'sai'"); + + testThreshold("k2", "INSERT INTO %s (k1, k2, v) VALUES (0, ?, 0)"); + testThreshold("k2", "UPDATE %s SET v = 1 WHERE k1 = 0 AND k2 = ?"); + } + + @Test + public void testSimpleClustering() throws Throwable + { + createTable("CREATE TABLE %s (k int, c text, v int, PRIMARY KEY(k, c))"); + createIndex("CREATE INDEX ON %s (c) USING 'sai'"); + + testThreshold("c", "INSERT INTO %s (k, c, v) VALUES (0, ?, 0)"); + testThreshold("c", "UPDATE %s SET v = 1 WHERE k = 0 AND c = ?"); + } + + @Test + public void testRegularColumn() throws Throwable + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, v text)"); + createIndex("CREATE INDEX ON %s (v) USING 'sai'"); + + testThreshold("v", "INSERT INTO %s (k, v) VALUES (0, ?)"); + testThreshold("v", "UPDATE %s SET v = ? WHERE k = 0"); + } + + @Test + public void testStaticColumn() throws Throwable + { + createTable("CREATE TABLE %s (k int, c int, s text STATIC, r int, PRIMARY KEY(k, c))"); + createIndex("CREATE INDEX ON %s (s) USING 'sai'"); + + testThreshold("s", "INSERT INTO %s (k, s) VALUES (0, ?)"); + testThreshold("s", "INSERT INTO %s (k, c, s, r) VALUES (0, 0, ?, 0)"); + testThreshold("s", "UPDATE %s SET s = ? WHERE k = 0"); + testThreshold("s", "UPDATE %s SET s = ?, r = 0 WHERE k = 0 AND c = 0"); + } + + @Test + public void testList() throws Throwable + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, l list<text>)"); + createIndex("CREATE INDEX ON %s (l) USING 'sai'"); + + for (String query : Arrays.asList("INSERT INTO %s (k, l) VALUES (0, ?)", + "UPDATE %s SET l = ? WHERE k = 0", + "UPDATE %s SET l = l + ? WHERE k = 0")) + { + testCollection("l", query, this::list); + } + + testThreshold("l", "UPDATE %s SET l[0] = ? WHERE k = 0"); + + String query = "UPDATE %s SET l = l - ? WHERE k = 0"; + assertValid(query, this::list, allocate(1)); + assertValid(query, this::list, allocate(FAIL_THRESHOLD)); + assertValid(query, this::list, allocate(FAIL_THRESHOLD + 1)); // Doesn't write anything because we couldn't write + } + + @Test + public void testBatch() throws Throwable + { + createTable("CREATE TABLE %s (k text, c text, r text, s text STATIC, PRIMARY KEY(k, c))"); + createIndex("CREATE INDEX ON %s (s) USING 'sai'"); + createIndex("CREATE INDEX ON %s (r) USING 'sai'"); + + // static column + testThreshold("s", "BEGIN BATCH INSERT INTO %s (k, s) VALUES ('0', ?); APPLY BATCH;"); + testThreshold("s", "BEGIN BATCH INSERT INTO %s (k, s, c, r) VALUES ('0', ?, '0', '0'); APPLY BATCH;"); + testThreshold("s", "BEGIN BATCH UPDATE %s SET s = ? WHERE k = '0'; APPLY BATCH;"); + testThreshold("s", "BEGIN BATCH UPDATE %s SET s = ?, r = '0' WHERE k = '0' AND c = '0'; APPLY BATCH;"); + + // regular column + testThreshold("r", "BEGIN BATCH INSERT INTO %s (k, c, r) VALUES ('0', '0', ?); APPLY BATCH;"); + testThreshold("r", "BEGIN BATCH UPDATE %s SET r = ? WHERE k = '0' AND c = '0'; APPLY BATCH;"); + } + + @Test + public void testCASWithIfNotExistsCondition() throws Throwable + { + createTable("CREATE TABLE %s (k text, c text, v text, s text STATIC, PRIMARY KEY(k, c))"); + createIndex("CREATE INDEX ON %s (s) USING 'sai'"); + createIndex("CREATE INDEX ON %s (v) USING 'sai'"); + + // static column + assertValid("INSERT INTO %s (k, s) VALUES ('1', ?) IF NOT EXISTS", allocate(1)); + assertValid("INSERT INTO %s (k, s) VALUES ('2', ?) IF NOT EXISTS", allocate(WARN_THRESHOLD)); + assertValid("INSERT INTO %s (k, s) VALUES ('2', ?) IF NOT EXISTS", allocate(WARN_THRESHOLD + 1)); // not applied + assertWarns("s", "INSERT INTO %s (k, s) VALUES ('3', ?) IF NOT EXISTS", allocate(WARN_THRESHOLD + 1)); + + // regular column + assertValid("INSERT INTO %s (k, c, v) VALUES ('4', '0', ?) IF NOT EXISTS", allocate(1)); + assertValid("INSERT INTO %s (k, c, v) VALUES ('5', '0', ?) IF NOT EXISTS", allocate(WARN_THRESHOLD)); + assertValid("INSERT INTO %s (k, c, v) VALUES ('5', '0', ?) IF NOT EXISTS", allocate(WARN_THRESHOLD + 1)); // not applied + assertWarns("v", "INSERT INTO %s (k, c, v) VALUES ('6', '0', ?) IF NOT EXISTS", allocate(WARN_THRESHOLD + 1)); + } + + @Test + public void testSelect() throws Throwable + { + createTable("CREATE TABLE %s (k text, c text, r text, s text STATIC, PRIMARY KEY(k, c))"); + createIndex("CREATE INDEX ON %s (c) USING 'sai'"); + createIndex("CREATE INDEX ON %s (r) USING 'sai'"); + createIndex("CREATE INDEX ON %s (s) USING 'sai'"); + + // the guardail is only checked for writes; reads are excluded + testNoThreshold("SELECT * FROM %s WHERE k = ?"); + testNoThreshold("SELECT * FROM %s WHERE k = '0' AND c = ?"); + testNoThreshold("SELECT * FROM %s WHERE c = ? ALLOW FILTERING"); + testNoThreshold("SELECT * FROM %s WHERE s = ? ALLOW FILTERING"); + testNoThreshold("SELECT * FROM %s WHERE r = ? ALLOW FILTERING"); + } + + @Test + public void testWarningTermOnBuild() + { + ByteBuffer largeTerm = allocate(warnThreshold() + 1); + ByteBuffer smallTerm = allocate(1); + + createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v text)"); + execute("INSERT INTO %s (k, v) VALUES (0, ?)", largeTerm); + execute("INSERT INTO %s (k, v) VALUES (1, ?)", smallTerm); + createIndex("CREATE INDEX ON %s(v) USING 'sai'"); + + // verify that the large term is written on initial index build + assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE v = ?", largeTerm)).result.size(), 1); + assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE v = ?", smallTerm)).result.size(), 1); + } + + @Test + public void testFailingTermOnBuild() + { + ByteBuffer oversizedTerm = allocate(failThreshold() + 1); + ByteBuffer smallTerm = allocate(1); + + createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v text)"); + execute("INSERT INTO %s (k, v) VALUES (0, ?)", oversizedTerm); + execute("INSERT INTO %s (k, v) VALUES (1, ?)", smallTerm); + createIndex("CREATE INDEX ON %s(v) USING 'sai'"); + + // verify that the oversized term isn't written on initial index build + assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE v = ?", oversizedTerm)).result.size(), 0); + assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE v = ?", smallTerm)).result.size(), 1); + } +} diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiVectorTermSizeTest.java b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiVectorTermSizeTest.java new file mode 100644 index 0000000000..6269f4d98c --- /dev/null +++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiVectorTermSizeTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.guardrails; + +import java.nio.ByteBuffer; +import java.util.List; + +import com.google.common.primitives.Floats; +import org.junit.Test; + +import org.apache.cassandra.config.DataStorageSpec; +import org.apache.cassandra.db.marshal.FloatType; +import org.apache.cassandra.db.marshal.VectorType; +import org.apache.cassandra.transport.messages.ResultMessage; + +import static org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.BYTES; +import static org.junit.Assert.assertEquals; + +/** + * Tests the guardrails around the size of SAI vector terms + * + * @see Guardrails#saiVectorTermSize + */ +public class GuardrailSaiVectorTermSizeTest extends ValueThresholdTester +{ + private static final int WARN_THRESHOLD = 1024; // bytes + private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes + + public GuardrailSaiVectorTermSizeTest() + { + super(WARN_THRESHOLD + "B", + FAIL_THRESHOLD + "B", + Guardrails.saiVectorTermSize, + Guardrails::setSaiVectorTermSizeThreshold, + Guardrails::getSaiVectorTermSizeWarnThreshold, + Guardrails::getSaiVectorTermSizeFailThreshold, + bytes -> new DataStorageSpec.LongBytesBound(bytes, BYTES).toString(), + size -> new DataStorageSpec.LongBytesBound(size).toBytes()); + } + + @Override + protected int warnThreshold() + { + return WARN_THRESHOLD; + } + + @Override + protected int failThreshold() + { + return FAIL_THRESHOLD; + } + + @Test + public void testWarn() throws Throwable + { + int warnDimensions = warnThreshold() / 4; // 4 bytes per dimension + List<Float> warnVector = Floats.asList(new float[warnDimensions + 1]); + + createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v vector<float, " + warnVector.size() + ">)"); + createIndex("CREATE INDEX ON %s(v) USING 'sai' WITH OPTIONS = {'similarity_function' : 'euclidean'}"); + + VectorType<Float> vectorType = VectorType.getInstance(FloatType.instance, warnDimensions + 1); + assertWarns(() -> execute("INSERT INTO %s (k, v) VALUES (0, ?)", vectorType.decompose(warnVector)), + "Value of column 'v' has size"); + } + + @Test + public void testFail() throws Throwable + { + int failDimensions = failThreshold() / 4; // 4 bytes per dimension + List<Float> failVector = Floats.asList(new float[failDimensions + 1]); + + createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v vector<float, " + failVector.size() + ">)"); + createIndex("CREATE INDEX ON %s(v) USING 'sai' WITH OPTIONS = {'similarity_function' : 'euclidean'}"); + + VectorType<Float> vectorType = VectorType.getInstance(FloatType.instance, failDimensions + 1); + assertFails(() -> execute("INSERT INTO %s (k, v) VALUES (0, ?)", vectorType.decompose(failVector)), + "Value of column 'v' has size"); + } + + @Test + public void testWarningVectorOnBuild() + { + int warnDimensions = warnThreshold() / 4; // 4 bytes per dimension + List<Float> largeVector = Floats.asList(new float[warnDimensions + 1]); + + createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v vector<float, " + largeVector.size() + ">)"); + + VectorType<Float> vectorType = VectorType.getInstance(FloatType.instance, warnDimensions + 1); + ByteBuffer vectorBytes = vectorType.decompose(largeVector); + execute("INSERT INTO %s (k, v) VALUES (0, ?)", vectorBytes); + + createIndex("CREATE INDEX ON %s(v) USING 'sai' WITH OPTIONS = {'similarity_function' : 'euclidean'}"); + + // verify that the large vector is written on initial index build + assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", vectorBytes)).result.size(), 1); + } + + @Test + public void testFailingVectorOnBuild() + { + int failDimensions = failThreshold() / 4; // 4 bytes per dimension + List<Float> oversizedVector = Floats.asList(new float[failDimensions + 1]); + + createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v vector<float, " + oversizedVector.size() + ">)"); + + VectorType<Float> vectorType = VectorType.getInstance(FloatType.instance, failDimensions + 1); + ByteBuffer vectorBytes = vectorType.decompose(oversizedVector); + execute("INSERT INTO %s (k, v) VALUES (0, ?)", vectorBytes); + + createIndex("CREATE INDEX ON %s(v) USING 'sai' WITH OPTIONS = {'similarity_function' : 'euclidean'}"); + + // verify that the oversized vector isn't written on initial index build + assertEquals(((ResultMessage.Rows) execute("SELECT k, v FROM %s")).result.size(), 1); + assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", vectorBytes)).result.size(), 0); + } +} diff --git a/test/unit/org/apache/cassandra/db/guardrails/ValueThresholdTester.java b/test/unit/org/apache/cassandra/db/guardrails/ValueThresholdTester.java new file mode 100644 index 0000000000..95cc599796 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/guardrails/ValueThresholdTester.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.guardrails; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.function.ToLongFunction; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.ListType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.db.marshal.SetType; +import org.apache.cassandra.transport.messages.ResultMessage; + +import static java.lang.String.format; +import static java.nio.ByteBuffer.allocate; + +public abstract class ValueThresholdTester extends ThresholdTester +{ + protected ValueThresholdTester(String warnThreshold, + String failThreshold, + Threshold threshold, + TriConsumer<Guardrails, String, String> setter, + Function<Guardrails, String> warnGetter, + Function<Guardrails, String> failGetter, + Function<Long, String> stringFormatter, + ToLongFunction<String> stringParser) + { + super(warnThreshold, + failThreshold, + threshold, + setter, + warnGetter, + failGetter, + stringFormatter, + stringParser); + } + + protected abstract int warnThreshold(); + + protected abstract int failThreshold(); + + /** + * Tests that the max column size guardrail threshold is not applied for the specified 1-placeholder CQL query. + * + * @param query a CQL modification statement with exactly one placeholder + */ + protected void testNoThreshold(String query) throws Throwable + { + assertValid(query, allocate(1)); + + assertValid(query, allocate(warnThreshold())); + assertValid(query, allocate(warnThreshold() + 1)); + + assertValid(query, allocate(failThreshold())); + assertValid(query, allocate(failThreshold() + 1)); + } + + /** + * Tests that the max column size guardrail threshold is not applied for the specified 2-placeholder CQL query. + * + * @param query a CQL modification statement with exactly two placeholders + */ + protected void testNoThreshold2(String query) throws Throwable + { + assertValid(query, allocate(1), allocate(1)); + + assertValid(query, allocate(warnThreshold()), allocate(1)); + assertValid(query, allocate(1), allocate(warnThreshold())); + assertValid(query, allocate((warnThreshold())), allocate((warnThreshold()))); + assertValid(query, allocate(warnThreshold() + 1), allocate(1)); + assertValid(query, allocate(1), allocate(warnThreshold() + 1)); + + assertValid(query, allocate(failThreshold()), allocate(1)); + assertValid(query, allocate(1), allocate(failThreshold())); + assertValid(query, allocate((failThreshold())), allocate((failThreshold()))); + assertValid(query, allocate(failThreshold() + 1), allocate(1)); + assertValid(query, allocate(1), allocate(failThreshold() + 1)); + } + + /** + * Tests that the max column size guardrail threshold is applied for the specified 1-placeholder CQL query. + * + * @param column the name of the column referenced by the query placeholder + * @param query a CQL query with exactly one placeholder + */ + protected void testThreshold(String column, String query) throws Throwable + { + testThreshold(column, query, 0); + } + + /** + * Tests that the max column size guardrail threshold is applied for the specified 1-placeholder CQL query. + * + * @param column the name of the column referenced by the query placeholder + * @param query a CQL query with exactly one placeholder + * @param serializationBytes the extra bytes added to the placeholder value by its wrapping column type serializer + */ + protected void testThreshold(String column, String query, int serializationBytes) throws Throwable + { + int warn = warnThreshold() - serializationBytes; + int fail = failThreshold() - serializationBytes; + + assertValid(query, allocate(0)); + assertValid(query, allocate(warn)); + assertWarns(column, query, allocate(warn + 1)); + assertFails(column, query, allocate(fail + 1)); + } + + /** + * Tests that the max column size guardrail threshold is applied for the specified 2-placeholder CQL query. + * + * @param column the name of the column referenced by the placeholders + * @param query a CQL query with exactly two placeholders + */ + protected void testThreshold2(String column, String query) throws Throwable + { + testThreshold2(column, query, 0); + } + + /** + * Tests that the max column size guardrail threshold is applied for the specified 2-placeholder query. + * + * @param column the name of the column referenced by the placeholders + * @param query a CQL query with exactly two placeholders + * @param serializationBytes the extra bytes added to the size of the placeholder value by their wrapping serializer + */ + protected void testThreshold2(String column, String query, int serializationBytes) throws Throwable + { + int warn = warnThreshold() - serializationBytes; + int fail = failThreshold() - serializationBytes; + + assertValid(query, allocate(0), allocate(0)); + assertValid(query, allocate(warn), allocate(0)); + assertValid(query, allocate(0), allocate(warn)); + assertValid(query, allocate(warn / 2), allocate(warn / 2)); + + assertWarns(column, query, allocate(warn + 1), allocate(0)); + assertWarns(column, query, allocate(0), allocate(warn + 1)); + + assertFails(column, query, allocate(fail + 1), allocate(0)); + assertFails(column, query, allocate(0), allocate(fail + 1)); + } + + protected void testCollection(String column, String query, Function<ByteBuffer[], ByteBuffer> collectionBuilder) throws Throwable + { + assertValid(query, collectionBuilder, allocate(1)); + assertValid(query, collectionBuilder, allocate(1), allocate(1)); + assertValid(query, collectionBuilder, allocate(warnThreshold())); + assertValid(query, collectionBuilder, allocate(warnThreshold()), allocate(1)); + assertValid(query, collectionBuilder, allocate(1), allocate(warnThreshold())); + assertValid(query, collectionBuilder, allocate(warnThreshold()), allocate(warnThreshold())); + + assertWarns(column, query, collectionBuilder, allocate(warnThreshold() + 1)); + assertWarns(column, query, collectionBuilder, allocate(warnThreshold() + 1), allocate(1)); + assertWarns(column, query, collectionBuilder, allocate(1), allocate(warnThreshold() + 1)); + + assertFails(column, query, collectionBuilder, allocate(failThreshold() + 1)); + assertFails(column, query, collectionBuilder, allocate(failThreshold() + 1), allocate(1)); + assertFails(column, query, collectionBuilder, allocate(1), allocate(failThreshold() + 1)); + } + + protected void testFrozenCollection(String column, String query, Function<ByteBuffer[], ByteBuffer> collectionBuilder) throws Throwable + { + assertValid(query, collectionBuilder, allocate(1)); + assertValid(query, collectionBuilder, allocate(warnThreshold() - 8)); + assertValid(query, collectionBuilder, allocate((warnThreshold() - 12) / 2), allocate((warnThreshold() - 12) / 2)); + + assertWarns(column, query, collectionBuilder, allocate(warnThreshold() - 7)); + assertWarns(column, query, collectionBuilder, allocate(warnThreshold() - 12), allocate(1)); + + assertFails(column, query, collectionBuilder, allocate(failThreshold() - 7)); + assertFails(column, query, collectionBuilder, allocate(failThreshold() - 12), allocate(1)); + } + + protected void testMap(String column, String query) throws Throwable + { + assertValid(query, this::map, allocate(1), allocate(1)); + assertValid(query, this::map, allocate(warnThreshold()), allocate(1)); + assertValid(query, this::map, allocate(1), allocate(warnThreshold())); + assertValid(query, this::map, allocate(warnThreshold()), allocate(warnThreshold())); + + assertWarns(column, query, this::map, allocate(1), allocate(warnThreshold() + 1)); + assertWarns(column, query, this::map, allocate(warnThreshold() + 1), allocate(1)); + + assertFails(column, query, this::map, allocate(failThreshold() + 1), allocate(1)); + assertFails(column, query, this::map, allocate(1), allocate(failThreshold() + 1)); + assertFails(column, query, this::map, allocate(failThreshold() + 1), allocate(failThreshold() + 1)); + } + + protected void assertValid(String query, ByteBuffer... values) throws Throwable + { + assertValid(() -> execute(query, values)); + } + + protected void assertValid(String query, Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) throws Throwable + { + assertValid(() -> execute(query, collectionBuilder.apply(values))); + } + + protected void assertWarns(String column, String query, Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) throws Throwable + { + assertWarns(column, query, collectionBuilder.apply(values)); + } + + protected void assertWarns(String column, String query, ByteBuffer... values) throws Throwable + { + String errorMessage = format("Value of column '%s' has size %s, this exceeds the warning threshold of %s.", + column, warnThreshold() + 1, warnThreshold()); + assertWarns(() -> execute(query, values), errorMessage); + } + + protected void assertFails(String column, String query, Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) throws Throwable + { + assertFails(column, query, collectionBuilder.apply(values)); + } + + protected void assertFails(String column, String query, ByteBuffer... values) throws Throwable + { + String errorMessage = format("Value of column '%s' has size %s, this exceeds the failure threshold of %s.", + column, failThreshold() + 1, failThreshold()); + assertFails(() -> execute(query, values), errorMessage); + } + + protected ResultMessage execute(String query, ByteBuffer... values) + { + return execute(userClientState, query, Arrays.asList(values)); + } + + protected ByteBuffer set(ByteBuffer... values) + { + return SetType.getInstance(BytesType.instance, true).decompose(ImmutableSet.copyOf(values)); + } + + protected ByteBuffer list(ByteBuffer... values) + { + return ListType.getInstance(BytesType.instance, true).decompose(ImmutableList.copyOf(values)); + } + + protected ByteBuffer map(ByteBuffer... values) + { + assert values.length % 2 == 0; + + int size = values.length / 2; + Map<ByteBuffer, ByteBuffer> m = new LinkedHashMap<>(size); + for (int i = 0; i < size; i++) + m.put(values[2 * i], values[(2 * i) + 1]); + + return MapType.getInstance(BytesType.instance, BytesType.instance, true).decompose(m); + } +} diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java index c497c79049..cfaff698ec 100644 --- a/test/unit/org/apache/cassandra/index/StubIndex.java +++ b/test/unit/org/apache/cassandra/index/StubIndex.java @@ -35,6 +35,7 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.utils.Pair; /** @@ -200,7 +201,8 @@ public class StubIndex implements Index return 0; } - public void validate(PartitionUpdate update) throws InvalidRequestException + @Override + public void validate(PartitionUpdate update, ClientState state) throws InvalidRequestException { } diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java index f67f2666b9..672777cc25 100644 --- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java +++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java @@ -56,6 +56,7 @@ import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Refs; @@ -224,7 +225,8 @@ public class CustomCassandraIndex implements Index return null; } - public void validate(PartitionUpdate update) throws InvalidRequestException + @Override + public void validate(PartitionUpdate update, ClientState state) throws InvalidRequestException { switch (indexedColumn.kind) { diff --git a/test/unit/org/apache/cassandra/index/sai/cql/AllTypesSimpleEqTest.java b/test/unit/org/apache/cassandra/index/sai/cql/AllTypesSimpleEqTest.java index 55390b3f5c..91bff0de97 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/AllTypesSimpleEqTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/AllTypesSimpleEqTest.java @@ -33,7 +33,6 @@ import accord.utils.Gens; import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.DecimalType; -import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.index.sai.StorageAttachedIndex; import org.apache.cassandra.utils.AbstractTypeGenerators; import org.apache.cassandra.utils.Generators; @@ -42,10 +41,8 @@ import org.quicktheories.generators.SourceDSL; @RunWith(Parameterized.class) public class AllTypesSimpleEqTest extends AbstractSimpleEqTestBase { - private static final Map<AbstractType<?>, Long> LARGE_DOMAIN_FAILING_SEEDS = - Map.of(UTF8Type.instance, -4379508235061872764L); - private static final Map<AbstractType<?>, Long> SHORT_DOMAIN_FAILING_SEEDS = - Map.of(UTF8Type.instance, -4379508235061872764L); + private static final Map<AbstractType<?>, Long> LARGE_DOMAIN_FAILING_SEEDS = Map.of(); + private static final Map<AbstractType<?>, Long> SHORT_DOMAIN_FAILING_SEEDS = Map.of(); private final AbstractType<?> type; @@ -59,8 +56,6 @@ public class AllTypesSimpleEqTest extends AbstractSimpleEqTestBase { return StorageAttachedIndex.SUPPORTED_TYPES.stream() .map(CQL3Type::getType) - // TODO: Track down unicode character edge cases... - .filter(t -> t != UTF8Type.instance) .distinct() .map(t -> new Object[]{ t }) .collect(Collectors.toList()); diff --git a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java index b7e61e3d7e..cc2e2f4b13 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java @@ -21,7 +21,6 @@ package org.apache.cassandra.index.sai.cql; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.EnumSet; import java.util.Iterator; @@ -41,7 +40,6 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.exceptions.InvalidConfigurationInQueryException; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.ReadFailureException; -import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.cql3.ColumnIdentifier; @@ -78,7 +76,6 @@ import org.apache.cassandra.inject.InvokePointBuilder; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Throwables; import org.assertj.core.api.Assertions; import org.mockito.Mockito; @@ -633,50 +630,6 @@ public class StorageAttachedIndexDDLTest extends SAITester assertThatThrownBy(() -> executeNet("SELECT id1 FROM %s WHERE v1>=0")).isInstanceOf(ReadFailureException.class); } - @Test - public void testMaxTermSize() throws Throwable - { - String largeTerm = UTF8Type.instance.compose(ByteBuffer.allocate(FBUtilities.MAX_UNSIGNED_SHORT / 2 + 1)); - int maxFloatVectorDimensions = (int) (CassandraRelevantProperties.SAI_MAX_VECTOR_TERM_SIZE.getSizeInBytes() / 4); // 4 bytes per dimension - Vector<Float> largeVector = vector(new float[maxFloatVectorDimensions + 1]); - - createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, r text, m map<text, text>, v vector<float, " + largeVector.size() + ">)"); - createIndex("CREATE CUSTOM INDEX ON %s(r) USING 'StorageAttachedIndex'"); - createIndex("CREATE CUSTOM INDEX ON %s(ENTRIES(m)) USING 'StorageAttachedIndex'"); - createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex' WITH OPTIONS = {'similarity_function' : 'euclidean'}"); - - // verify that a write exceeding max term size is accepted with client warnings - ResultSet resultSet = executeNet("INSERT INTO %s (k, r, m, v) VALUES (0, ?, {'" + largeTerm + "': ''}, " + largeVector + ')', largeTerm); - List<String> warnings = resultSet.getExecutionInfo().getWarnings(); - warnings.sort(String::compareTo); - assertEquals(3, warnings.size()); - assertTrue(warnings.get(0).contains("Can't add term of column m")); - assertTrue(warnings.get(1).contains("Can't add term of column r")); - assertTrue(warnings.get(2).contains("Can't add term of column v")); - - // verify that the large terms aren't written into the memtable indexes - assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, map(largeTerm, ""), largeVector)); - assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm)); - assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm)); - assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", largeVector)); - - // verify that the large terms aren't written into the sstable indexes after flush - flush(); - assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, map(largeTerm, ""), largeVector)); - assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm)); - assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm)); - assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", largeVector)); - - // verify that the large terms aren't written into the sstable indexes after compactions - executeNet("INSERT INTO %s (k, r, m, v) VALUES (0, ?, {'" + largeTerm + "': ''}, " + largeVector + ')', largeTerm); - flush(); - compact(); - assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, map(largeTerm, ""), largeVector)); - assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm)); - assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm)); - assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", largeVector)); - } - @Test public void shouldReleaseIndexFilesAfterDroppingLastIndex() throws Throwable { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org