This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new 9bfaee91c4 Optionally fail writes when SAI refuses to index a term 
value exceeding a configured maximum size
9bfaee91c4 is described below

commit 9bfaee91c4fd7a269e3ff924e8a504bad5d6514a
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Tue Apr 9 17:32:56 2024 -0500

    Optionally fail writes when SAI refuses to index a term value exceeding a 
configured maximum size
    
    patch by Caleb Rackliffe; reviewed by Berenguer Blasi and Stefan Miklosovic 
for CASSANDRA-19493
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   1 +
 conf/cassandra.yaml                                |  12 +
 .../config/CassandraRelevantProperties.java        |   3 -
 src/java/org/apache/cassandra/config/Config.java   |  11 +-
 .../apache/cassandra/config/GuardrailsOptions.java |  84 +++++++
 .../cassandra/cql3/statements/BatchStatement.java  |   2 +-
 .../cql3/statements/BatchUpdatesCollector.java     |  23 +-
 .../cassandra/cql3/statements/CQL3CasRequest.java  |   2 +-
 .../cql3/statements/ModificationStatement.java     |   2 +-
 .../statements/SingleTableUpdatesCollector.java    |   6 +-
 .../cql3/statements/UpdatesCollector.java          |   3 +-
 src/java/org/apache/cassandra/db/IMutation.java    |  27 +-
 .../apache/cassandra/db/guardrails/Guardrails.java |  98 +++++++-
 .../cassandra/db/guardrails/GuardrailsConfig.java  |  54 ++++
 .../cassandra/db/guardrails/GuardrailsMBean.java   |  72 ++++++
 .../cassandra/db/partitions/PartitionUpdate.java   |   5 +-
 .../cassandra/db/virtual/VirtualMutation.java      |   3 +-
 src/java/org/apache/cassandra/index/Index.java     |   5 +-
 .../org/apache/cassandra/index/IndexRegistry.java  |  41 ++--
 .../cassandra/index/SecondaryIndexManager.java     |   7 +-
 .../cassandra/index/internal/CassandraIndex.java   |   4 +-
 .../cassandra/index/sai/StorageAttachedIndex.java  |  77 +++---
 .../index/sai/disk/v1/SSTableIndexWriter.java      |   2 +-
 .../index/sai/memory/TrieMemoryIndex.java          |   2 +-
 .../index/sai/memory/VectorMemoryIndex.java        |   2 +-
 .../org/apache/cassandra/index/sasi/SASIIndex.java |   4 +-
 .../paxos/uncommitted/PaxosUncommittedIndex.java   |   4 +-
 .../cassandra/anttasks/TestNameCheckTask.java      |  26 +-
 .../guardrails/GuardrailColumnValueSizeTest.java   | 237 ++----------------
 .../guardrails/GuardrailSaiFrozenTermSizeTest.java | 139 +++++++++++
 .../guardrails/GuardrailSaiStringTermSizeTest.java | 215 ++++++++++++++++
 .../guardrails/GuardrailSaiVectorTermSizeTest.java | 133 ++++++++++
 .../db/guardrails/ValueThresholdTester.java        | 273 +++++++++++++++++++++
 .../unit/org/apache/cassandra/index/StubIndex.java |   4 +-
 .../index/internal/CustomCassandraIndex.java       |   4 +-
 .../index/sai/cql/AllTypesSimpleEqTest.java        |   9 +-
 .../index/sai/cql/StorageAttachedIndexDDLTest.java |  47 ----
 38 files changed, 1267 insertions(+), 377 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 74d142089c..09c5468db4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0-beta2
+ * Optionally fail writes when SAI refuses to index a term value exceeding 
configured term max size (CASSANDRA-19493)
  * Vector search can restrict on clustering keys when filtering isn't required 
(CASSANDRA-19544)
  * Fix FBUtilities' parsing of gcp cos_containerd kernel versions 
(CASSANDRA-18594)
  * Clean up KeyRangeIterator classes (CASSANDRA-19428)
diff --git a/NEWS.txt b/NEWS.txt
index c074867069..1ba8f6639e 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -126,6 +126,7 @@ New features
       - Vector dimensions
       - Whether it is possible to execute secondary index queries without 
restricting on partition key
       - Warning and failure thresholds for maximum referenced SAI indexes on a 
replica when executing a SELECT query
+      - Warning and failure thresholds for the size of terms written to an SAI 
index
     - It is possible to list ephemeral snapshots by nodetool listsnaphots 
command when flag "-e" is specified.
     - Added a new flag to `nodetool profileload` and JMX endpoint to set up 
recurring profile load generation on specified
       intervals (see CASSANDRA-17821)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 961c607f93..99dd449c84 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -2152,6 +2152,18 @@ drop_compact_storage_enabled: false
 # before emitting a failure (defaults to -1 to disable)
 #sai_sstable_indexes_per_query_fail_threshold: -1
 
+# Guardrail specifying warn/fail thresholds for the size of string terms 
written to an SAI index
+# sai_string_term_size_warn_threshold: 1KiB
+# sai_string_term_size_fail_threshold: 8KiB
+
+# Guardrail specifying warn/fail thresholds for the size of frozen terms 
written to an SAI index
+# sai_frozen_term_size_warn_threshold: 1KiB
+# sai_frozen_term_size_fail_threshold: 8KiB
+
+# Guardrail specifying warn/fail thresholds for the size of vector terms 
written to an SAI index
+# sai_vector_term_size_warn_threshold: 16KiB
+# sai_vector_term_size_fail_threshold: 32KiB
+
 # The default secondary index implementation when CREATE INDEX does not 
specify one via USING.
 # ex. "legacy_local_table" - (default) legacy secondary index, implemented as 
a hidden table
 # ex. "sai" - "storage-attched" index, implemented via optimized 
SSTable/Memtable-attached indexes
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index d36530f51f..e0dfeb19ff 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -425,9 +425,6 @@ public enum CassandraRelevantProperties
     SAI_INTERSECTION_CLAUSE_LIMIT("cassandra.sai.intersection_clause_limit", 
"2"),
     /** Latest version to be used for SAI index writing */
     SAI_LATEST_VERSION("cassandra.sai.latest_version", "aa"),
-    SAI_MAX_FROZEN_TERM_SIZE("cassandra.sai.max_frozen_term_size", "5KiB"),
-    SAI_MAX_STRING_TERM_SIZE("cassandra.sai.max_string_term_size", "1KiB"),
-    SAI_MAX_VECTOR_TERM_SIZE("cassandra.sai.max_vector_term_size", "32KiB"),
 
     /** Minimum number of reachable leaves for a given node to be eligible for 
an auxiliary posting list */
     SAI_MINIMUM_POSTINGS_LEAVES("cassandra.sai.minimum_postings_leaves", "64"),
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index aa0f3ee476..30da524777 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -903,11 +903,18 @@ public class Config
     public volatile boolean zero_ttl_on_twcs_warned = true;
     public volatile boolean zero_ttl_on_twcs_enabled = true;
     public volatile boolean non_partition_restricted_index_query_enabled = 
true;
-    public volatile int sai_sstable_indexes_per_query_warn_threshold = 32;
-    public volatile int sai_sstable_indexes_per_query_fail_threshold = -1;
     public volatile boolean intersect_filtering_query_warned = true;
     public volatile boolean intersect_filtering_query_enabled = true;
 
+    public volatile int sai_sstable_indexes_per_query_warn_threshold = 32;
+    public volatile int sai_sstable_indexes_per_query_fail_threshold = -1;
+    public volatile DataStorageSpec.LongBytesBound 
sai_string_term_size_warn_threshold = new 
DataStorageSpec.LongBytesBound("1KiB");
+    public volatile DataStorageSpec.LongBytesBound 
sai_string_term_size_fail_threshold = new 
DataStorageSpec.LongBytesBound("8KiB");
+    public volatile DataStorageSpec.LongBytesBound 
sai_frozen_term_size_warn_threshold = new 
DataStorageSpec.LongBytesBound("1KiB");
+    public volatile DataStorageSpec.LongBytesBound 
sai_frozen_term_size_fail_threshold = new 
DataStorageSpec.LongBytesBound("8KiB");
+    public volatile DataStorageSpec.LongBytesBound 
sai_vector_term_size_warn_threshold = new 
DataStorageSpec.LongBytesBound("16KiB");
+    public volatile DataStorageSpec.LongBytesBound 
sai_vector_term_size_fail_threshold = new 
DataStorageSpec.LongBytesBound("32KiB");
+
     public volatile DurationSpec.LongNanosecondsBound streaming_state_expires 
= new DurationSpec.LongNanosecondsBound("3d");
     public volatile DataStorageSpec.LongBytesBound streaming_state_size = new 
DataStorageSpec.LongBytesBound("40MiB");
 
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java 
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index 8d7bd52197..504384e0d0 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -960,6 +960,90 @@ public class GuardrailsOptions implements GuardrailsConfig
                                   x -> 
config.sai_sstable_indexes_per_query_fail_threshold = x);
     }
 
+    @Override
+    @Nullable
+    public DataStorageSpec.LongBytesBound getSaiStringTermSizeWarnThreshold()
+    {
+        return config.sai_string_term_size_warn_threshold;
+    }
+
+    @Override
+    @Nullable
+    public DataStorageSpec.LongBytesBound getSaiStringTermSizeFailThreshold()
+    {
+        return config.sai_string_term_size_fail_threshold;
+    }
+
+    @Override
+    public void setSaiStringTermSizeThreshold(@Nullable 
DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound 
fail)
+    {
+        validateSizeThreshold(warn, fail, false, "sai_string_term_size");
+        updatePropertyWithLogging("sai_string_term_size_warn_threshold",
+                                  warn,
+                                  () -> 
config.sai_string_term_size_warn_threshold,
+                                  x -> 
config.sai_string_term_size_warn_threshold = x);
+        updatePropertyWithLogging("sai_string_term_size_fail_threshold",
+                                  fail,
+                                  () -> 
config.sai_string_term_size_fail_threshold,
+                                  x -> 
config.sai_string_term_size_fail_threshold = x);
+    }
+
+    @Override
+    @Nullable
+    public DataStorageSpec.LongBytesBound getSaiFrozenTermSizeWarnThreshold()
+    {
+        return config.sai_frozen_term_size_warn_threshold;
+    }
+
+    @Override
+    @Nullable
+    public DataStorageSpec.LongBytesBound getSaiFrozenTermSizeFailThreshold()
+    {
+        return config.sai_frozen_term_size_fail_threshold;
+    }
+
+    @Override
+    public void setSaiFrozenTermSizeThreshold(@Nullable 
DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound 
fail)
+    {
+        validateSizeThreshold(warn, fail, false, "sai_frozen_term_size");
+        updatePropertyWithLogging("sai_frozen_term_size_warn_threshold",
+                                  warn,
+                                  () -> 
config.sai_frozen_term_size_warn_threshold,
+                                  x -> 
config.sai_frozen_term_size_warn_threshold = x);
+        updatePropertyWithLogging("sai_frozen_term_size_fail_threshold",
+                                  fail,
+                                  () -> 
config.sai_frozen_term_size_fail_threshold,
+                                  x -> 
config.sai_frozen_term_size_fail_threshold = x);
+    }
+
+    @Override
+    @Nullable
+    public DataStorageSpec.LongBytesBound getSaiVectorTermSizeWarnThreshold()
+    {
+        return config.sai_vector_term_size_warn_threshold;
+    }
+
+    @Override
+    @Nullable
+    public DataStorageSpec.LongBytesBound getSaiVectorTermSizeFailThreshold()
+    {
+        return config.sai_vector_term_size_fail_threshold;
+    }
+
+    @Override
+    public void setSaiVectorTermSizeThreshold(@Nullable 
DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound 
fail)
+    {
+        validateSizeThreshold(warn, fail, false, "sai_vector_term_size");
+        updatePropertyWithLogging("sai_vector_term_size_warn_threshold",
+                                  warn,
+                                  () -> 
config.sai_vector_term_size_warn_threshold,
+                                  x -> 
config.sai_vector_term_size_warn_threshold = x);
+        updatePropertyWithLogging("sai_vector_term_size_fail_threshold",
+                                  fail,
+                                  () -> 
config.sai_vector_term_size_fail_threshold,
+                                  x -> 
config.sai_vector_term_size_fail_threshold = x);
+    }
+
     @Override
     public boolean getNonPartitionRestrictedQueryEnabled()
     {
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 9f5ac2ff26..eea09100b0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -320,7 +320,7 @@ public class BatchStatement implements CQLStatement
             
ClientWarn.instance.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING,
 new Object[] { suffix, tablesWithZeroGcGs })
                                                      .getMessage());
         }
-        return collector.toMutations();
+        return collector.toMutations(state);
     }
 
     /**
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
index c346eb9671..521cd2afa6 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@ -18,19 +18,28 @@
 package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.virtual.VirtualMutation;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.service.ClientState;
 
 import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
@@ -122,9 +131,13 @@ final class BatchUpdatesCollector implements 
UpdatesCollector
 
     /**
      * Returns a collection containing all the mutations.
+     * 
+     * @param state state related to the client connection
+     * 
      * @return a collection containing all the mutations.
      */
-    public List<IMutation> toMutations()
+    @Override
+    public List<IMutation> toMutations(ClientState state)
     {
         List<IMutation> ms = new ArrayList<>();
         for (Map<ByteBuffer, IMutationBuilder> ksMap : 
mutationBuilders.values())
@@ -132,7 +145,7 @@ final class BatchUpdatesCollector implements 
UpdatesCollector
             for (IMutationBuilder builder : ksMap.values())
             {
                 IMutation mutation = builder.build();
-                mutation.validateIndexedColumns();
+                mutation.validateIndexedColumns(state);
                 mutation.validateSize(MessagingService.current_version, 
CommitLogSegment.ENTRY_OVERHEAD_SIZE);
                 ms.add(mutation);
             }
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 02721dd99d..9671592c16 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -243,7 +243,7 @@ public class CQL3CasRequest implements CASRequest
             upd.applyUpdates(current, updateBuilder, clientState);
 
         PartitionUpdate partitionUpdate = updateBuilder.build();
-        IndexRegistry.obtain(metadata).validate(partitionUpdate);
+        IndexRegistry.obtain(metadata).validate(partitionUpdate, clientState);
 
         return partitionUpdate;
     }
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 1f8f3f037a..61fbd6f391 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -747,7 +747,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
         HashMultiset<ByteBuffer> perPartitionKeyCounts = 
HashMultiset.create(keys);
         SingleTableUpdatesCollector collector = new 
SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts);
         addUpdates(collector, keys, state, options, local, timestamp, 
nowInSeconds, queryStartNanoTime);
-        return collector.toMutations();
+        return collector.toMutations(state);
     }
 
     final void addUpdates(UpdatesCollector collector,
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
 
b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
index 14b1660764..5ff299eb88 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.virtual.VirtualMutation;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
 
 /**
  * Utility class to collect updates.
@@ -92,7 +93,8 @@ final class SingleTableUpdatesCollector implements 
UpdatesCollector
      * Returns a collection containing all the mutations.
      * @return a collection containing all the mutations.
      */
-    public List<IMutation> toMutations()
+    @Override
+    public List<IMutation> toMutations(ClientState state)
     {
         List<IMutation> ms = new ArrayList<>(puBuilders.size());
         for (PartitionUpdate.Builder builder : puBuilders.values())
@@ -106,7 +108,7 @@ final class SingleTableUpdatesCollector implements 
UpdatesCollector
             else
                 mutation = new Mutation(builder.build());
 
-            mutation.validateIndexedColumns();
+            mutation.validateIndexedColumns(state);
             mutation.validateSize(MessagingService.current_version, 
CommitLogSegment.ENTRY_OVERHEAD_SIZE);
             ms.add(mutation);
         }
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java 
b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
index c3dd334971..40b75ab5fa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
@@ -25,9 +25,10 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
 
 public interface UpdatesCollector
 {
     PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, 
DecoratedKey dk, ConsistencyLevel consistency);
-    List<IMutation> toMutations();
+    List<IMutation> toMutations(ClientState state);
 }
diff --git a/src/java/org/apache/cassandra/db/IMutation.java 
b/src/java/org/apache/cassandra/db/IMutation.java
index 801c9a9c8c..1998e2c035 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -24,24 +24,25 @@ import java.util.function.Supplier;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ClientState;
 
 public interface IMutation
 {
-    public long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
+    long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
 
-    public void apply();
-    public String getKeyspaceName();
-    public Collection<TableId> getTableIds();
-    public DecoratedKey key();
-    public long getTimeout(TimeUnit unit);
-    public String toString(boolean shallow);
-    public Collection<PartitionUpdate> getPartitionUpdates();
-    public Supplier<Mutation> hintOnFailure();
+    void apply();
+    String getKeyspaceName();
+    Collection<TableId> getTableIds();
+    DecoratedKey key();
+    long getTimeout(TimeUnit unit);
+    String toString(boolean shallow);
+    Collection<PartitionUpdate> getPartitionUpdates();
+    Supplier<Mutation> hintOnFailure();
 
-    public default void validateIndexedColumns()
+    default void validateIndexedColumns(ClientState state)
     {
         for (PartitionUpdate pu : getPartitionUpdates())
-            pu.validateIndexedColumns();
+            pu.validateIndexedColumns(state);
     }
 
     /**
@@ -52,14 +53,14 @@ public interface IMutation
      * @param overhead overhadd to add for mutation size to validate. Pass 
zero if not required but not a negative value.
      * @throws MutationExceededMaxSizeException if {@link 
DatabaseDescriptor#getMaxMutationSize()} is exceeded
       */
-    public void validateSize(int version, int overhead);
+    void validateSize(int version, int overhead);
 
     /**
      * Computes the total data size of the specified mutations.
      * @param mutations the mutations
      * @return the total data size of the specified mutations
      */
-    public static long dataSize(Collection<? extends IMutation> mutations)
+    static long dataSize(Collection<? extends IMutation> mutations)
     {
         long size = 0;
         for (IMutation mutation : mutations)
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index 01157bf345..f61ae204d1 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -359,7 +359,7 @@ public final class Guardrails implements GuardrailsMBean
                      state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getColumnValueSizeWarnThreshold()),
                      state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getColumnValueSizeFailThreshold()),
                      (isWarning, what, value, threshold) ->
-                     format("Value of column %s has size %s, this exceeds the 
%s threshold of %s.",
+                     format("Value of column '%s' has size %s, this exceeds 
the %s threshold of %s.",
                             what, value, isWarning ? "warning" : "failure", 
threshold));
 
     /**
@@ -501,6 +501,42 @@ public final class Guardrails implements GuardrailsMBean
                       format("The number of SSTable indexes queried on index 
%s violated %s threshold value %s with value %s",
                              what, isWarning ? "warning" : "failure", 
threshold, value)));
 
+    /**
+     * Guardrail on the size of a string term written to SAI index.
+     */
+    public static final MaxThreshold saiStringTermSize =
+    new MaxThreshold("sai_string_term_size",
+                     null,
+                     state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiStringTermSizeWarnThreshold()),
+                     state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiStringTermSizeFailThreshold()),
+                     (isWarning, what, value, threshold) ->
+                     format("Value of column '%s' has size %s, this exceeds 
the %s threshold of %s.",
+                            what, value, isWarning ? "warning" : "failure", 
threshold));
+
+    /**
+     * Guardrail on the size of a frozen term written to SAI index.
+     */
+    public static final MaxThreshold saiFrozenTermSize =
+    new MaxThreshold("sai_frozen_term_size",
+                     null,
+                     state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiFrozenTermSizeWarnThreshold()),
+                     state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiFrozenTermSizeFailThreshold()),
+                     (isWarning, what, value, threshold) ->
+                     format("Value of column '%s' has size %s, this exceeds 
the %s threshold of %s.",
+                            what, value, isWarning ? "warning" : "failure", 
threshold));
+
+    /**
+     * Guardrail on the size of a vector term written to SAI index.
+     */
+    public static final MaxThreshold saiVectorTermSize =
+    new MaxThreshold("sai_vector_term_size",
+                     null,
+                     state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiVectorTermSizeWarnThreshold()),
+                     state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getSaiVectorTermSizeFailThreshold()),
+                     (isWarning, what, value, threshold) ->
+                     format("Value of column '%s' has size %s, this exceeds 
the %s threshold of %s.",
+                            what, value, isWarning ? "warning" : "failure", 
threshold));
+
     public static final EnableFlag nonPartitionRestrictedIndexQueryEnabled =
     new EnableFlag("non_partition_restricted_index_query_enabled",
                    "Executing a query on secondary indexes without partition 
key restriction might degrade performance",
@@ -1248,6 +1284,66 @@ public final class Guardrails implements GuardrailsMBean
         DEFAULT_CONFIG.setSaiSSTableIndexesPerQueryThreshold(warn, fail);
     }
 
+    @Override
+    @Nullable
+    public String getSaiStringTermSizeWarnThreshold()
+    {
+        return 
sizeToString(DEFAULT_CONFIG.getSaiStringTermSizeWarnThreshold());
+    }
+
+    @Override
+    @Nullable
+    public String getSaiStringTermSizeFailThreshold()
+    {
+        return 
sizeToString(DEFAULT_CONFIG.getSaiStringTermSizeFailThreshold());
+    }
+
+    @Override
+    public void setSaiStringTermSizeThreshold(@Nullable String warnSize, 
@Nullable String failSize)
+    {
+        DEFAULT_CONFIG.setSaiStringTermSizeThreshold(sizeFromString(warnSize), 
sizeFromString(failSize));
+    }
+
+    @Override
+    @Nullable
+    public String getSaiFrozenTermSizeWarnThreshold()
+    {
+        return 
sizeToString(DEFAULT_CONFIG.getSaiFrozenTermSizeWarnThreshold());
+    }
+
+    @Override
+    @Nullable
+    public String getSaiFrozenTermSizeFailThreshold()
+    {
+        return 
sizeToString(DEFAULT_CONFIG.getSaiFrozenTermSizeFailThreshold());
+    }
+
+    @Override
+    public void setSaiFrozenTermSizeThreshold(@Nullable String warnSize, 
@Nullable String failSize)
+    {
+        DEFAULT_CONFIG.setSaiFrozenTermSizeThreshold(sizeFromString(warnSize), 
sizeFromString(failSize));
+    }
+
+    @Override
+    @Nullable
+    public String getSaiVectorTermSizeWarnThreshold()
+    {
+        return 
sizeToString(DEFAULT_CONFIG.getSaiVectorTermSizeWarnThreshold());
+    }
+
+    @Override
+    @Nullable
+    public String getSaiVectorTermSizeFailThreshold()
+    {
+        return 
sizeToString(DEFAULT_CONFIG.getSaiVectorTermSizeFailThreshold());
+    }
+
+    @Override
+    public void setSaiVectorTermSizeThreshold(@Nullable String warnSize, 
@Nullable String failSize)
+    {
+        DEFAULT_CONFIG.setSaiVectorTermSizeThreshold(sizeFromString(warnSize), 
sizeFromString(failSize));
+    }
+
     @Override
     public boolean getNonPartitionRestrictedQueryEnabled()
     {
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index 5c6880fde5..508ae53fc0 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -464,6 +464,60 @@ public interface GuardrailsConfig
      */
     void setSaiSSTableIndexesPerQueryThreshold(int warn, int fail);
 
+    /**
+     * @return the warning threshold for the size of string terms written to 
an SAI index
+     */
+    DataStorageSpec.LongBytesBound getSaiStringTermSizeWarnThreshold();
+
+    /**
+     * @return the failure threshold for the size of string terms written to 
an SAI index
+     */
+    DataStorageSpec.LongBytesBound getSaiStringTermSizeFailThreshold();
+
+    /**
+     * Sets warning and failure thresholds for the size of string terms 
written to an SAI index
+     *
+     * @param warn value to set for warn threshold
+     * @param fail value to set for fail threshold
+     */
+    void setSaiStringTermSizeThreshold(@Nullable 
DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound 
fail);
+
+    /**
+     * @return the warning threshold for the size of frozen terms written to 
an SAI index
+     */
+    DataStorageSpec.LongBytesBound getSaiFrozenTermSizeWarnThreshold();
+
+    /**
+     * @return the failure threshold for the size of frozen terms written to 
an SAI index
+     */
+    DataStorageSpec.LongBytesBound getSaiFrozenTermSizeFailThreshold();
+
+    /**
+     * Sets warning and failure thresholds for the size of frozen terms 
written to an SAI index
+     *
+     * @param warn value to set for warn threshold
+     * @param fail value to set for fail threshold
+     */
+    void setSaiFrozenTermSizeThreshold(@Nullable 
DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound 
fail);
+
+    /**
+     * @return the warning threshold for the size of vector terms written to 
an SAI index
+     */
+    DataStorageSpec.LongBytesBound getSaiVectorTermSizeWarnThreshold();
+
+    /**
+     * @return the failure threshold for the size of vector terms written to 
an SAI index
+     */
+    DataStorageSpec.LongBytesBound getSaiVectorTermSizeFailThreshold();
+
+    /**
+     * Sets warning and failure thresholds for the size of vector terms 
written to an SAI index
+     *
+     * @param warn value to set for warn threshold
+     * @param fail value to set for fail threshold
+     */
+    void setSaiVectorTermSizeThreshold(@Nullable 
DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound 
fail);
+
     /**
      * Returns whether it is possible to execute a query against secondary 
indexes without specifying
      * any partition key restrictions.
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index 89fe7a70ba..9a3750bba8 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -789,6 +789,78 @@ public interface GuardrailsMBean
      */
     void setSaiSSTableIndexesPerQueryThreshold(int warn, int fail);
 
+    /**
+     * @return The warning threshold for string terms written to an SAI index, 
as a human-readable string.
+     *         (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 
40B}) A {@code null} value means disabled.
+     */
+    @Nullable
+    String getSaiStringTermSizeWarnThreshold();
+
+    /**
+     * @return The failure threshold for string terms written to an SAI index, 
as a human-readable string.
+     *         (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 
40B}) A {@code null} value means disabled.
+     */
+    @Nullable
+    String getSaiStringTermSizeFailThreshold();
+
+    /**
+     * @param warnSize The warning threshold for string terms written to an 
SAI index, as a human-readable string.
+     *                 (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or 
{@code 40B})
+     *                 A {@code null} value means disabled.
+     * @param failSize The failure threshold for string terms written to an 
SAI index, as a human-readable string.
+     *                 (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or 
{@code 40B})
+     *                 A {@code null} value means disabled.
+     */
+    void setSaiStringTermSizeThreshold(@Nullable String warnSize, @Nullable 
String failSize);
+
+    /**
+     * @return The warning threshold for frozen terms written to an SAI index, 
as a human-readable string.
+     *         (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 
40B}) A {@code null} value means disabled.
+     */
+    @Nullable
+    String getSaiFrozenTermSizeWarnThreshold();
+
+    /**
+     * @return The failure threshold for frozen terms written to an SAI index, 
as a human-readable string.
+     *         (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 
40B}) A {@code null} value means disabled.
+     */
+    @Nullable
+    String getSaiFrozenTermSizeFailThreshold();
+
+    /**
+     * @param warnSize The warning threshold for frozen terms written to an 
SAI index, as a human-readable string.
+     *                 (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or 
{@code 40B})
+     *                 A {@code null} value means disabled.
+     * @param failSize The failure threshold for frozen terms written to an 
SAI index, as a human-readable string.
+     *                 (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or 
{@code 40B})
+     *                 A {@code null} value means disabled.
+     */
+    void setSaiFrozenTermSizeThreshold(@Nullable String warnSize, @Nullable 
String failSize);
+
+    /**
+     * @return The warning threshold for vector terms written to an SAI index, 
as a human-readable string.
+     *         (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 
40B}) A {@code null} value means disabled.
+     */
+    @Nullable
+    String getSaiVectorTermSizeWarnThreshold();
+
+    /**
+     * @return The failure threshold for vector terms written to an SAI index, 
as a human-readable string.
+     *         (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 
40B}) A {@code null} value means disabled.
+     */
+    @Nullable
+    String getSaiVectorTermSizeFailThreshold();
+
+    /**
+     * @param warnSize The warning threshold for vector terms written to an 
SAI index, as a human-readable string.
+     *                 (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or 
{@code 40B})
+     *                 A {@code null} value means disabled.
+     * @param failSize The failure threshold for vector terms written to an 
SAI index, as a human-readable string.
+     *                 (ex. {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or 
{@code 40B})
+     *                 A {@code null} value means disabled.
+     */
+    void setSaiVectorTermSizeThreshold(@Nullable String warnSize, @Nullable 
String failSize);
+
     /**
      * Returns whether it is possible to execute a query against secondary 
indexes without specifying
      * any partition key restrictions.
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 1047fc0a70..035cb0edd6 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 import org.apache.cassandra.utils.vint.VIntCoding;
@@ -542,9 +543,9 @@ public class PartitionUpdate extends AbstractBTreePartition
         return new SimpleBuilders.PartitionUpdateBuilder(metadata, 
partitionKeyValues);
     }
 
-    public void validateIndexedColumns()
+    public void validateIndexedColumns(ClientState state)
     {
-        IndexRegistry.obtain(metadata()).validate(this);
+        IndexRegistry.obtain(metadata()).validate(this, state);
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java 
b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
index 3e26032383..8c3b5b4afd 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ClientState;
 
 /**
  * A specialised IMutation implementation for virtual keyspaces.
@@ -113,7 +114,7 @@ public final class VirtualMutation implements IMutation
     }
 
     @Override
-    public void validateIndexedColumns()
+    public void validateIndexedColumns(ClientState state)
     {
         // no-op
     }
diff --git a/src/java/org/apache/cassandra/index/Index.java 
b/src/java/org/apache/cassandra/index/Index.java
index 18fed751af..8abc800e0f 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -66,6 +66,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
 
 /**
  * Consisting of a top level Index interface and two sub-interfaces which 
handle read and write operations,
@@ -498,9 +499,11 @@ public interface Index
      * will process it. The partition key as well as the clustering and
      * cell values for each row in the update may be checked by index
      * implementations
+     * 
      * @param update PartitionUpdate containing the values to be validated by 
registered Index implementations
+     * @param state state related to the client connection
      */
-    public void validate(PartitionUpdate update) throws 
InvalidRequestException;
+    public void validate(PartitionUpdate update, ClientState state) throws 
InvalidRequestException;
 
     /**
      * Returns the SSTable-attached {@link Component}s created by this index.
diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java 
b/src/java/org/apache/cassandra/index/IndexRegistry.java
index 46e87f357f..d29bb11db4 100644
--- a/src/java/org/apache/cassandra/index/IndexRegistry.java
+++ b/src/java/org/apache/cassandra/index/IndexRegistry.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.io.sstable.SSTableFlushObserver;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
 
 /**
  * The collection of all Index instances for a base table.
@@ -98,7 +99,7 @@ public interface IndexRegistry
         }
 
         @Override
-        public void validate(PartitionUpdate update)
+        public void validate(PartitionUpdate update, ClientState state)
         {
         }
     };
@@ -113,21 +114,25 @@ public interface IndexRegistry
     {
         final Index index = new Index()
         {
+            @Override
             public Callable<?> getInitializationTask()
             {
                 return null;
             }
 
+            @Override
             public IndexMetadata getIndexMetadata()
             {
                 return null;
             }
 
+            @Override
             public Callable<?> getMetadataReloadTask(IndexMetadata 
indexMetadata)
             {
                 return null;
             }
 
+            @Override
             public void register(IndexRegistry registry)
             {
             }
@@ -137,60 +142,72 @@ public interface IndexRegistry
             {
             }
 
+            @Override
             public Optional<ColumnFamilyStore> getBackingTable()
             {
                 return Optional.empty();
             }
 
+            @Override
             public Callable<?> getBlockingFlushTask()
             {
                 return null;
             }
 
+            @Override
             public Callable<?> getInvalidateTask()
             {
                 return null;
             }
 
+            @Override
             public Callable<?> getTruncateTask(long truncatedAt)
             {
                 return null;
             }
 
+            @Override
             public boolean shouldBuildBlocking()
             {
                 return false;
             }
 
+            @Override
             public boolean dependsOn(ColumnMetadata column)
             {
                 return false;
             }
 
+            @Override
             public boolean supportsExpression(ColumnMetadata column, Operator 
operator)
             {
                 return true;
             }
 
+            @Override
             public AbstractType<?> customExpressionValueType()
             {
                 return BytesType.instance;
             }
 
+            @Override
             public RowFilter getPostIndexQueryFilter(RowFilter filter)
             {
                 return null;
             }
 
+            @Override
             public long getEstimatedResultRows()
             {
                 return 0;
             }
 
-            public void validate(PartitionUpdate update) throws 
InvalidRequestException
+            @Override
+            public void validate(PartitionUpdate update, ClientState state) 
throws InvalidRequestException
             {
             }
 
+            @Override
             public Indexer indexerFor(DecoratedKey key, 
RegularAndStaticColumns columns, long nowInSec, WriteContext ctx, 
IndexTransaction.Type transactionType, Memtable memtable)
             {
                 return null;
@@ -210,16 +227,6 @@ public interface IndexRegistry
                 return Collections.singleton(index);
             }
 
-            @Override
-            public void addIndex(Index index)
-            {
-            }
-
-            @Override
-            public void removeIndex(Index index)
-            {
-            }
-
             @Override
             public boolean containsIndex(Index i)
             {
@@ -254,6 +261,7 @@ public interface IndexRegistry
             }
         };
 
+        @Override
         public void registerIndex(Index index, Index.Group.Key groupKey, 
Supplier<Index.Group> groupSupplier)
         {
         }
@@ -263,11 +271,13 @@ public interface IndexRegistry
         {
         }
 
+        @Override
         public Index getIndex(IndexMetadata indexMetadata)
         {
             return index;
         }
 
+        @Override
         public Collection<Index> listIndexes()
         {
             return Collections.singletonList(index);
@@ -279,12 +289,14 @@ public interface IndexRegistry
             return Collections.singletonList(group);
         }
 
+        @Override
         public Optional<Index> getBestIndexFor(RowFilter.Expression expression)
         {
             return Optional.empty();
         }
 
-        public void validate(PartitionUpdate update)
+        @Override
+        public void validate(PartitionUpdate update, ClientState state)
         {
         }
     };
@@ -313,8 +325,9 @@ public interface IndexRegistry
      * implementations
      *
      * @param update PartitionUpdate containing the values to be validated by 
registered Index implementations
+     * @param state state related to the client connection
      */
-    void validate(PartitionUpdate update);
+    void validate(PartitionUpdate update, ClientState state);
 
     /**
      * Returns the {@code IndexRegistry} associated to the specified table.
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index da15282b39..ebd2cc0379 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -75,6 +75,7 @@ import 
org.apache.cassandra.notifications.SSTableAddedNotification;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.Indexes;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.pager.SinglePartitionPager;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.ProtocolVersion;
@@ -1295,11 +1296,13 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
      * implementations
      *
      * @param update PartitionUpdate containing the values to be validated by 
registered Index implementations
+     * @param state state related to the client connection
      */
-    public void validate(PartitionUpdate update) throws InvalidRequestException
+    @Override
+    public void validate(PartitionUpdate update, ClientState state) throws 
InvalidRequestException
     {
         for (Index index : indexes.values())
-            index.validate(update);
+            index.validate(update, state);
     }
 
     /*
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java 
b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 125c6e9dbd..1f39ea502b 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -57,6 +57,7 @@ import 
org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -308,7 +309,8 @@ public abstract class CassandraIndex implements Index
 
     }
 
-    public void validate(PartitionUpdate update) throws InvalidRequestException
+    @Override
+    public void validate(PartitionUpdate update, ClientState state) throws 
InvalidRequestException
     {
         switch (indexedColumn.kind)
         {
diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
index d32176a9aa..b42a165f11 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
@@ -42,8 +42,7 @@ import javax.annotation.Nullable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +63,9 @@ import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.WriteContext;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.guardrails.GuardrailViolatedException;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.guardrails.MaxThreshold;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.FloatType;
@@ -102,17 +104,17 @@ import 
org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
-import static 
org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_FROZEN_TERM_SIZE;
-import static 
org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_STRING_TERM_SIZE;
-import static 
org.apache.cassandra.config.CassandraRelevantProperties.SAI_MAX_VECTOR_TERM_SIZE;
 import static 
org.apache.cassandra.index.sai.disk.v1.IndexWriterConfig.MAX_TOP_K;
 
 public class StorageAttachedIndex implements Index
@@ -142,11 +144,7 @@ public class StorageAttachedIndex implements Index
 
     private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
 
-    public static final long MAX_STRING_TERM_SIZE = 
SAI_MAX_STRING_TERM_SIZE.getSizeInBytes();
-    public static final long MAX_FROZEN_TERM_SIZE = 
SAI_MAX_FROZEN_TERM_SIZE.getSizeInBytes();
-    public static final long MAX_VECTOR_TERM_SIZE = 
SAI_MAX_VECTOR_TERM_SIZE.getSizeInBytes();
-    public static final String TERM_OVERSIZE_MESSAGE = "Can't add term of 
column %s to index for key: %s, term size %s " +
-                                                       "max allowed size %s, 
use analyzed = true (if not yet set) for that column.";
+    public static final String TERM_OVERSIZE_MESSAGE = "Term in column '%s' 
for key '%s' is too large and cannot be indexed. (term size: %s)";
 
     // Used to build indexes on newly added SSTables:
     private static final StorageAttachedIndexBuildingSupport 
INDEX_BUILDER_SUPPORT = new StorageAttachedIndexBuildingSupport();
@@ -182,7 +180,7 @@ public class StorageAttachedIndex implements Index
     private final PrimaryKey.Factory primaryKeyFactory;
     private final MemtableIndexManager memtableIndexManager;
     private final IndexMetrics indexMetrics;
-    private final long maxTermSize;
+    private final MaxThreshold maxTermSizeGuardrail;
 
     // Tracks whether we've started the index build on initialization.
     private volatile boolean initBuildStarted = false;
@@ -206,8 +204,10 @@ public class StorageAttachedIndex implements Index
         analyzerFactory = AbstractAnalyzer.fromOptions(indexTermType, 
indexMetadata.options);
         memtableIndexManager = new MemtableIndexManager(this);
         indexMetrics = new IndexMetrics(this, memtableIndexManager);
-        maxTermSize = indexTermType.isVector() ? MAX_VECTOR_TERM_SIZE
-                                               : (indexTermType.isFrozen() ? 
MAX_FROZEN_TERM_SIZE : MAX_STRING_TERM_SIZE);
+        maxTermSizeGuardrail = indexTermType.isVector()
+                               ? Guardrails.saiVectorTermSize
+                               : (indexTermType.isFrozen() ? 
Guardrails.saiFrozenTermSize
+                                                           : 
Guardrails.saiStringTermSize);
     }
 
     /**
@@ -497,11 +497,15 @@ public class StorageAttachedIndex implements Index
     }
 
     @Override
-    public void validate(PartitionUpdate update) throws InvalidRequestException
+    public void validate(PartitionUpdate update, ClientState state) throws 
InvalidRequestException
     {
         DecoratedKey key = update.partitionKey();
-        for (Row row : update)
-            validateMaxTermSizeForRow(key, row, true);
+
+        if (indexTermType.columnMetadata().isStatic())
+            validateTermSizeForRow(key, update.staticRow(), true, state);
+        else
+            for (Row row : update)
+                validateTermSizeForRow(key, row, true, state);
     }
 
     @Override
@@ -730,60 +734,62 @@ public class StorageAttachedIndex implements Index
     /**
      * Validate maximum term size for given row
      */
-    public void validateMaxTermSizeForRow(DecoratedKey key, Row row, boolean 
sendClientWarning)
+    public void validateTermSizeForRow(DecoratedKey key, Row row, boolean 
isClientMutation, ClientState state)
     {
         AbstractAnalyzer analyzer = hasAnalyzer() ? analyzer() : null;
         if (indexTermType.isNonFrozenCollection())
         {
             Iterator<ByteBuffer> bufferIterator = indexTermType.valuesOf(row, 
FBUtilities.nowInSeconds());
             while (bufferIterator != null && bufferIterator.hasNext())
-                validateMaxTermSizeForCell(analyzer, key, 
bufferIterator.next(), sendClientWarning);
+                validateTermSizeForCell(analyzer, key, bufferIterator.next(), 
isClientMutation, state);
         }
         else
         {
             ByteBuffer value = indexTermType.valueOf(key, row, 
FBUtilities.nowInSeconds());
-            validateMaxTermSizeForCell(analyzer, key, value, 
sendClientWarning);
+            validateTermSizeForCell(analyzer, key, value, isClientMutation, 
state);
         }
     }
 
-    private void validateMaxTermSizeForCell(AbstractAnalyzer analyzer, 
DecoratedKey key, @Nullable ByteBuffer cellBuffer, boolean sendClientWarning)
+    private void validateTermSizeForCell(AbstractAnalyzer analyzer, 
DecoratedKey key, @Nullable ByteBuffer cellBuffer, boolean isClientMutation, 
ClientState state)
     {
         if (cellBuffer == null || cellBuffer.remaining() == 0)
             return;
 
         // analyzer should not return terms that are larger than the origin 
value.
-        if (cellBuffer.remaining() <= maxTermSize)
+        if (!maxTermSizeGuardrail.warnsOn(cellBuffer.remaining(), null))
             return;
 
         if (analyzer != null)
         {
             analyzer.reset(cellBuffer.duplicate());
             while (analyzer.hasNext())
-                validateMaxTermSize(key, analyzer.next(), sendClientWarning);
+                validateTermSize(key, analyzer.next(), isClientMutation, 
state);
         }
         else
         {
-            validateMaxTermSize(key, cellBuffer.duplicate(), 
sendClientWarning);
+            validateTermSize(key, cellBuffer.duplicate(), isClientMutation, 
state);
         }
     }
 
     /**
-     * Validate maximum term size for given term
-     * @return true if given term is valid; otherwise false.
+     * @return true if the size of the given term is below the maximum term 
size, false otherwise
+     * 
+     * @throws GuardrailViolatedException if a client mutation contains a term 
that breaches the failure threshold
      */
-    public boolean validateMaxTermSize(DecoratedKey key, ByteBuffer term, 
boolean sendClientWarning)
+    public boolean validateTermSize(DecoratedKey key, ByteBuffer term, boolean 
isClientMutation, ClientState state)
     {
-        if (term.remaining() > maxTermSize)
+        if (isClientMutation)
         {
-            String message = 
indexIdentifier.logMessage(String.format(TERM_OVERSIZE_MESSAGE,
-                                                        
indexTermType.columnName(),
-                                                        key,
-                                                        
FBUtilities.prettyPrintMemory(term.remaining()),
-                                                        
FBUtilities.prettyPrintMemory(maxTermSize)));
-
-            if (sendClientWarning)
-                ClientWarn.instance.warn(message);
+            maxTermSizeGuardrail.guard(term.remaining(), 
indexTermType.columnName(), false, state);
+            return true;
+        }
 
+        if (maxTermSizeGuardrail.failsOn(term.remaining(), state))
+        {
+            String message = 
indexIdentifier.logMessage(String.format(TERM_OVERSIZE_MESSAGE,
+                                                                      
indexTermType.columnName(),
+                                                                      key,
+                                                                      
FBUtilities.prettyPrintMemory(term.remaining())));
             noSpamLogger.warn(message);
             return false;
         }
@@ -869,6 +875,7 @@ public class StorageAttachedIndex implements Index
         return FutureCombiner.allOf(futures);
     }
 
+    @SuppressWarnings("SameReturnValue")
     private Future<?> startPreJoinTask()
     {
         try
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
index 06bb7d80fe..58ee69a215 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
@@ -187,7 +187,7 @@ public class SSTableIndexWriter implements 
PerColumnIndexWriter
 
     private void addTerm(ByteBuffer term, PrimaryKey key, long sstableRowId) 
throws IOException
     {
-        if (!index.validateMaxTermSize(key.partitionKey(), term, false))
+        if (!index.validateTermSize(key.partitionKey(), term, false, null))
             return;
 
         if (currentBuilder == null)
diff --git 
a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java 
b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
index 9997616d8b..c8d32a8386 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
@@ -204,7 +204,7 @@ public class TrieMemoryIndex extends MemoryIndex
 
     private void addTerm(PrimaryKey primaryKey, ByteBuffer term)
     {
-        if (index.validateMaxTermSize(primaryKey.partitionKey(), term, false))
+        if (index.validateTermSize(primaryKey.partitionKey(), term, false, 
null))
         {
             setMinMaxTerm(term.duplicate());
 
diff --git 
a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java 
b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java
index ad94f4475d..bea5cb877f 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java
@@ -77,7 +77,7 @@ public class VectorMemoryIndex extends MemoryIndex
     @Override
     public synchronized long add(DecoratedKey key, Clustering<?> clustering, 
ByteBuffer value)
     {
-        if (value == null || value.remaining() == 0 || 
!index.validateMaxTermSize(key, value, false))
+        if (value == null || value.remaining() == 0 || 
!index.validateTermSize(key, value, false, null))
             return 0;
 
         var primaryKey = index.hasClustering() ? 
index.keyFactory().create(key, clustering)
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java 
b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 93448f9e78..ccfed7f5c9 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -76,6 +76,7 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -266,7 +267,8 @@ public class SASIIndex implements Index, 
INotificationConsumer
         return Long.MIN_VALUE;
     }
 
-    public void validate(PartitionUpdate update) throws InvalidRequestException
+    @Override
+    public void validate(PartitionUpdate update, ClientState state) throws 
InvalidRequestException
     {}
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedIndex.java
 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedIndex.java
index bfb73dba05..5348087279 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedIndex.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedIndex.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.Indexes;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.CloseableIterator;
 
 import static java.util.Collections.singletonList;
@@ -238,7 +239,8 @@ public class PaxosUncommittedIndex implements Index, 
PaxosUncommittedTracker.Upd
         return 0;
     }
 
-    public void validate(PartitionUpdate update) throws InvalidRequestException
+    @Override
+    public void validate(PartitionUpdate update, ClientState state) throws 
InvalidRequestException
     {
 
     }
diff --git a/test/anttasks/org/apache/cassandra/anttasks/TestNameCheckTask.java 
b/test/anttasks/org/apache/cassandra/anttasks/TestNameCheckTask.java
index 29b2f767e3..b1c17ad2c3 100644
--- a/test/anttasks/org/apache/cassandra/anttasks/TestNameCheckTask.java
+++ b/test/anttasks/org/apache/cassandra/anttasks/TestNameCheckTask.java
@@ -24,7 +24,10 @@ import java.lang.reflect.Modifier;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.file.Paths;
+import java.util.ArrayDeque;
 import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -158,14 +161,21 @@ public class TestNameCheckTask
      */
     private Stream<? extends Class<?>> expand(Class<?> klass, Reflections 
reflections)
     {
-        Set<? extends Class<?>> subTypes = reflections.getSubTypesOf(klass);
-        if (subTypes == null || subTypes.isEmpty())
-            return Stream.of(klass);
-        Stream<? extends Class<?>> subs = subTypes.stream();
-        // assume we include if not abstract
-        if (!Modifier.isAbstract(klass.getModifiers()))
-            subs = Stream.concat(Stream.of(klass), subs);
-        return subs;
+        Set<Class<?>> concreteTypes = new HashSet<>();
+        Deque<Class<?>> typeStack = new ArrayDeque<>();
+        typeStack.push(klass);
+
+        while (!typeStack.isEmpty())
+        {
+            Class<?> type = typeStack.pop();
+
+            if (!Modifier.isAbstract(type.getModifiers()))
+                concreteTypes.add(type);
+
+            reflections.getSubTypesOf(type).forEach(typeStack::push);
+        }
+
+        return concreteTypes.stream();
     }
 
     public static void main(String[] args)
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java
 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java
index f0513daa9c..7827508dd4 100644
--- 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java
+++ 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailColumnValueSizeTest.java
@@ -18,21 +18,11 @@
 
 package org.apache.cassandra.db.guardrails;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.function.Function;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DataStorageSpec;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.marshal.SetType;
 
 import static java.lang.String.format;
 import static java.nio.ByteBuffer.allocate;
@@ -41,7 +31,7 @@ import static 
org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.BYTES;
 /**
  * Tests the guardrail for the size of column values, {@link 
Guardrails#columnValueSize}.
  */
-public class GuardrailColumnValueSizeTest extends ThresholdTester
+public class GuardrailColumnValueSizeTest extends ValueThresholdTester
 {
     private static final int WARN_THRESHOLD = 1024; // bytes
     private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes
@@ -58,6 +48,18 @@ public class GuardrailColumnValueSizeTest extends 
ThresholdTester
               size -> new DataStorageSpec.LongBytesBound(size).toBytes());
     }
 
+    @Override
+    protected int warnThreshold()
+    {
+        return WARN_THRESHOLD;
+    }
+
+    @Override
+    protected int failThreshold()
+    {
+        return FAIL_THRESHOLD;
+    }
+
     @Test
     public void testSimplePartitionKey() throws Throwable
     {
@@ -386,10 +388,10 @@ public class GuardrailColumnValueSizeTest extends 
ThresholdTester
     {
         createTable("CREATE TABLE %s (k text, c text, v text, s text STATIC, 
PRIMARY KEY(k, c))");
 
-        // partition key, the CAS updates with values beyond the threshold are 
not applied so they don't come to fail
+        // partition key, the CAS updates with values beyond the threshold are 
not applied, so they don't come to fail
         testNoThreshold("UPDATE %s SET v = '0' WHERE k = ? AND c = '0' IF 
EXISTS");
 
-        // clustering key, the CAS updates with values beyond the threshold 
are not applied so they don't come to fail
+        // clustering key, the CAS updates with values beyond the threshold 
are not applied, so they don't come to fail
         testNoThreshold("UPDATE %s SET v = '0' WHERE k = '0' AND c = ? IF 
EXISTS");
 
         // static column, only the applied CAS updates can fire the guardrail
@@ -442,213 +444,4 @@ public class GuardrailColumnValueSizeTest extends 
ThresholdTester
         testNoThreshold("SELECT * FROM %s WHERE s = ? ALLOW FILTERING");
         testNoThreshold("SELECT * FROM %s WHERE r = ? ALLOW FILTERING");
     }
-
-    /**
-     * Tests that the max column size guardrail threshold is not applied for 
the specified 1-placeholder CQL query.
-     *
-     * @param query a CQL modification statement with exactly one placeholder
-     */
-    private void testNoThreshold(String query) throws Throwable
-    {
-        assertValid(query, allocate(1));
-
-        assertValid(query, allocate(WARN_THRESHOLD));
-        assertValid(query, allocate(WARN_THRESHOLD + 1));
-
-        assertValid(query, allocate(FAIL_THRESHOLD));
-        assertValid(query, allocate(FAIL_THRESHOLD + 1));
-    }
-
-    /**
-     * Tests that the max column size guardrail threshold is not applied for 
the specified 2-placeholder CQL query.
-     *
-     * @param query a CQL modification statement with exactly two placeholders
-     */
-    private void testNoThreshold2(String query) throws Throwable
-    {
-        assertValid(query, allocate(1), allocate(1));
-
-        assertValid(query, allocate(WARN_THRESHOLD), allocate(1));
-        assertValid(query, allocate(1), allocate(WARN_THRESHOLD));
-        assertValid(query, allocate((WARN_THRESHOLD)), 
allocate((WARN_THRESHOLD)));
-        assertValid(query, allocate(WARN_THRESHOLD + 1), allocate(1));
-        assertValid(query, allocate(1), allocate(WARN_THRESHOLD + 1));
-
-        assertValid(query, allocate(FAIL_THRESHOLD), allocate(1));
-        assertValid(query, allocate(1), allocate(FAIL_THRESHOLD));
-        assertValid(query, allocate((FAIL_THRESHOLD)), 
allocate((FAIL_THRESHOLD)));
-        assertValid(query, allocate(FAIL_THRESHOLD + 1), allocate(1));
-        assertValid(query, allocate(1), allocate(FAIL_THRESHOLD + 1));
-    }
-
-    /**
-     * Tests that the max column size guardrail threshold is applied for the 
specified 1-placeholder CQL query.
-     *
-     * @param column the name of the column referenced by the query placeholder
-     * @param query  a CQL query with exactly one placeholder
-     */
-    private void testThreshold(String column, String query) throws Throwable
-    {
-        testThreshold(column, query, 0);
-    }
-
-    /**
-     * Tests that the max column size guardrail threshold is applied for the 
specified 1-placeholder CQL query.
-     *
-     * @param column             the name of the column referenced by the 
query placeholder
-     * @param query              a CQL query with exactly one placeholder
-     * @param serializationBytes the extra bytes added to the placeholder 
value by its wrapping column type serializer
-     */
-    private void testThreshold(String column, String query, int 
serializationBytes) throws Throwable
-    {
-        int warn = WARN_THRESHOLD - serializationBytes;
-        int fail = FAIL_THRESHOLD - serializationBytes;
-
-        assertValid(query, allocate(0));
-        assertValid(query, allocate(warn));
-        assertWarns(column, query, allocate(warn + 1));
-        assertFails(column, query, allocate(fail + 1));
-    }
-
-    /**
-     * Tests that the max column size guardrail threshold is applied for the 
specified 2-placeholder CQL query.
-     *
-     * @param column the name of the column referenced by the placeholders
-     * @param query  a CQL query with exactly two placeholders
-     */
-    private void testThreshold2(String column, String query) throws Throwable
-    {
-        testThreshold2(column, query, 0);
-    }
-
-    /**
-     * Tests that the max column size guardrail threshold is applied for the 
specified 2-placeholder query.
-     *
-     * @param column             the name of the column referenced by the 
placeholders
-     * @param query              a CQL query with exactly two placeholders
-     * @param serializationBytes the extra bytes added to the size of the 
placeholder value by their wrapping serializer
-     */
-    private void testThreshold2(String column, String query, int 
serializationBytes) throws Throwable
-    {
-        int warn = WARN_THRESHOLD - serializationBytes;
-        int fail = FAIL_THRESHOLD - serializationBytes;
-
-        assertValid(query, allocate(0), allocate(0));
-        assertValid(query, allocate(warn), allocate(0));
-        assertValid(query, allocate(0), allocate(warn));
-        assertValid(query, allocate(warn / 2), allocate(warn / 2));
-
-        assertWarns(column, query, allocate(warn + 1), allocate(0));
-        assertWarns(column, query, allocate(0), allocate(warn + 1));
-
-        assertFails(column, query, allocate(fail + 1), allocate(0));
-        assertFails(column, query, allocate(0), allocate(fail + 1));
-    }
-
-    private void testCollection(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder) throws Throwable
-    {
-        assertValid(query, collectionBuilder, allocate(1));
-        assertValid(query, collectionBuilder, allocate(1), allocate(1));
-        assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD));
-        assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD), 
allocate(1));
-        assertValid(query, collectionBuilder, allocate(1), 
allocate(WARN_THRESHOLD));
-        assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD), 
allocate(WARN_THRESHOLD));
-
-        assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD 
+ 1));
-        assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD 
+ 1), allocate(1));
-        assertWarns(column, query, collectionBuilder, allocate(1), 
allocate(WARN_THRESHOLD + 1));
-
-        assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD 
+ 1));
-        assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD 
+ 1), allocate(1));
-        assertFails(column, query, collectionBuilder, allocate(1), 
allocate(FAIL_THRESHOLD + 1));
-    }
-
-    private void testFrozenCollection(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder) throws Throwable
-    {
-        assertValid(query, collectionBuilder, allocate(1));
-        assertValid(query, collectionBuilder, allocate(WARN_THRESHOLD - 8));
-        assertValid(query, collectionBuilder, allocate((WARN_THRESHOLD - 12) / 
2), allocate((WARN_THRESHOLD - 12) / 2));
-
-        assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD 
- 7));
-        assertWarns(column, query, collectionBuilder, allocate(WARN_THRESHOLD 
- 12), allocate(1));
-
-        assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD 
- 7));
-        assertFails(column, query, collectionBuilder, allocate(FAIL_THRESHOLD 
- 12), allocate(1));
-    }
-
-    private void testMap(String column, String query) throws Throwable
-    {
-        assertValid(query, this::map, allocate(1), allocate(1));
-        assertValid(query, this::map, allocate(WARN_THRESHOLD), allocate(1));
-        assertValid(query, this::map, allocate(1), allocate(WARN_THRESHOLD));
-        assertValid(query, this::map, allocate(WARN_THRESHOLD), 
allocate(WARN_THRESHOLD));
-
-        assertWarns(column, query, this::map, allocate(1), 
allocate(WARN_THRESHOLD + 1));
-        assertWarns(column, query, this::map, allocate(WARN_THRESHOLD + 1), 
allocate(1));
-
-        assertFails(column, query, this::map, allocate(FAIL_THRESHOLD + 1), 
allocate(1));
-        assertFails(column, query, this::map, allocate(1), 
allocate(FAIL_THRESHOLD + 1));
-        assertFails(column, query, this::map, allocate(FAIL_THRESHOLD + 1), 
allocate(FAIL_THRESHOLD + 1));
-    }
-
-    private void assertValid(String query, ByteBuffer... values) throws 
Throwable
-    {
-        assertValid(() -> execute(query, values));
-    }
-
-    private void assertValid(String query, Function<ByteBuffer[], ByteBuffer> 
collectionBuilder, ByteBuffer... values) throws Throwable
-    {
-        assertValid(() -> execute(query, collectionBuilder.apply(values)));
-    }
-
-    private void assertWarns(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) 
throws Throwable
-    {
-        assertWarns(column, query, collectionBuilder.apply(values));
-    }
-
-    private void assertWarns(String column, String query, ByteBuffer... 
values) throws Throwable
-    {
-        String errorMessage = format("Value of column %s has size %s, this 
exceeds the warning threshold of %s.",
-                                     column, WARN_THRESHOLD + 1, 
WARN_THRESHOLD);
-        assertWarns(() -> execute(query, values), errorMessage);
-    }
-
-    private void assertFails(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) 
throws Throwable
-    {
-        assertFails(column, query, collectionBuilder.apply(values));
-    }
-
-    private void assertFails(String column, String query, ByteBuffer... 
values) throws Throwable
-    {
-        String errorMessage = format("Value of column %s has size %s, this 
exceeds the failure threshold of %s.",
-                                     column, FAIL_THRESHOLD + 1, 
FAIL_THRESHOLD);
-        assertFails(() -> execute(query, values), errorMessage);
-    }
-
-    private void execute(String query, ByteBuffer... values)
-    {
-        execute(userClientState, query, Arrays.asList(values));
-    }
-
-    private ByteBuffer set(ByteBuffer... values)
-    {
-        return SetType.getInstance(BytesType.instance, 
true).decompose(ImmutableSet.copyOf(values));
-    }
-
-    private ByteBuffer list(ByteBuffer... values)
-    {
-        return ListType.getInstance(BytesType.instance, 
true).decompose(ImmutableList.copyOf(values));
-    }
-
-    private ByteBuffer map(ByteBuffer... values)
-    {
-        assert values.length % 2 == 0;
-
-        int size = values.length / 2;
-        Map<ByteBuffer, ByteBuffer> m = new LinkedHashMap<>(size);
-        for (int i = 0; i < size; i++)
-            m.put(values[2 * i], values[(2 * i) + 1]);
-
-        return MapType.getInstance(BytesType.instance, BytesType.instance, 
true).decompose(m);
-    }
 }
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiFrozenTermSizeTest.java
 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiFrozenTermSizeTest.java
new file mode 100644
index 0000000000..50cdb2ab58
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiFrozenTermSizeTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static java.lang.String.format;
+import static 
org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.BYTES;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the guardrails around the size of SAI frozen terms
+ *
+ * @see Guardrails#saiFrozenTermSize
+ */
+public class GuardrailSaiFrozenTermSizeTest extends ValueThresholdTester
+{
+    private static final int WARN_THRESHOLD = 2048; // bytes
+    private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes
+
+    public GuardrailSaiFrozenTermSizeTest()
+    {
+        super(WARN_THRESHOLD + "B",
+              FAIL_THRESHOLD + "B",
+              Guardrails.saiFrozenTermSize,
+              Guardrails::setSaiFrozenTermSizeThreshold,
+              Guardrails::getSaiFrozenTermSizeWarnThreshold,
+              Guardrails::getSaiFrozenTermSizeFailThreshold,
+              bytes -> new DataStorageSpec.LongBytesBound(bytes, 
BYTES).toString(),
+              size -> new DataStorageSpec.LongBytesBound(size).toBytes());
+    }
+
+    @Override
+    protected int warnThreshold()
+    {
+        return WARN_THRESHOLD;
+    }
+
+    @Override
+    protected int failThreshold()
+    {
+        return FAIL_THRESHOLD;
+    }
+
+    @Test
+    public void testTuple() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, t tuple<text, 
text>)");
+        createIndex("CREATE INDEX ON %s (t) USING 'sai'");
+
+        testThreshold2("t", "INSERT INTO %s (k, t) VALUES (0, (?, ?))", 8);
+        testThreshold2("t", "UPDATE %s SET t = (?, ?) WHERE k = 0", 8);
+    }
+
+    @Test
+    public void testFrozenUDT() throws Throwable
+    {
+        String udt = createType("CREATE TYPE %s (a text, b text)");
+        createTable(format("CREATE TABLE %%s (k int PRIMARY KEY, v 
frozen<%s>)", udt));
+        createIndex("CREATE INDEX ON %s (v) USING 'sai'");
+
+        testThreshold("v", "INSERT INTO %s (k, v) VALUES (0, {a: ?})", 8);
+        testThreshold("v", "INSERT INTO %s (k, v) VALUES (0, {b: ?})", 8);
+        testThreshold("v", "UPDATE %s SET v = {a: ?} WHERE k = 0", 8);
+        testThreshold("v", "UPDATE %s SET v = {b: ?} WHERE k = 0", 8);
+        testThreshold2("v", "INSERT INTO %s (k, v) VALUES (0, {a: ?, b: ?})", 
8);
+        testThreshold2("v", "UPDATE %s SET v = {a: ?, b: ?} WHERE k = 0", 8);
+    }
+
+    @Test
+    public void testFrozenList() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, fl 
frozen<list<text>>)");
+        createIndex("CREATE INDEX ON %s (FULL(fl)) USING 'sai'");
+
+        // the serialized size of a frozen list is the size of its serialized 
elements, plus a 32-bit integer prefix for
+        // the number of elements, and another 32-bit integer for the size of 
each element
+
+        for (String query : Arrays.asList("INSERT INTO %s (k, fl) VALUES (0, 
?)",
+                                          "UPDATE %s SET fl = ? WHERE k = 0"))
+        {
+            testFrozenCollection("fl", query, this::list);
+        }
+    }
+
+    @Test
+    public void testWarningTupleOnBuild()
+    {
+        ByteBuffer largeTuple = ByteBuffer.allocate(warnThreshold() + 1);
+        ByteBuffer smallTuple = ByteBuffer.allocate(1);
+
+        createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, t 
tuple<text>)");
+        execute("INSERT INTO %s (k, t) VALUES (0, (?))", largeTuple);
+        execute("INSERT INTO %s (k, t) VALUES (1, (?))", smallTuple);
+        createIndex("CREATE INDEX ON %s(t) USING 'sai'");
+
+        // verify that the large tuple is written on initial index build
+        assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE t = 
(?)", largeTuple)).result.size(), 1);
+        assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE t = 
(?)", smallTuple)).result.size(), 1);
+    }
+
+    @Test
+    public void testFailingTupleOnBuild()
+    {
+        ByteBuffer oversizedTuple = ByteBuffer.allocate(failThreshold() + 1);
+        ByteBuffer smallTuple = ByteBuffer.allocate(1);
+
+        createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, t 
tuple<text>)");
+        execute("INSERT INTO %s (k, t) VALUES (0, (?))", oversizedTuple);
+        execute("INSERT INTO %s (k, t) VALUES (1, (?))", smallTuple);
+        createIndex("CREATE INDEX ON %s(t) USING 'sai'");
+
+        // verify that the oversized tuple isn't written on initial index build
+        assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE t = 
(?)", oversizedTuple)).result.size(), 0);
+        assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE t = 
(?)", smallTuple)).result.size(), 1);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiStringTermSizeTest.java
 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiStringTermSizeTest.java
new file mode 100644
index 0000000000..868d736fad
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiStringTermSizeTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static java.nio.ByteBuffer.allocate;
+import static 
org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.BYTES;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the guardrails around the size of SAI string terms
+ *
+ * @see Guardrails#saiStringTermSize
+ */
+public class GuardrailSaiStringTermSizeTest extends ValueThresholdTester
+{
+    private static final int WARN_THRESHOLD = 1024; // bytes
+    private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes
+
+    public GuardrailSaiStringTermSizeTest()
+    {
+        super(WARN_THRESHOLD + "B",
+              FAIL_THRESHOLD + "B",
+              Guardrails.saiStringTermSize,
+              Guardrails::setSaiStringTermSizeThreshold,
+              Guardrails::getSaiStringTermSizeWarnThreshold,
+              Guardrails::getSaiStringTermSizeFailThreshold,
+              bytes -> new DataStorageSpec.LongBytesBound(bytes, 
BYTES).toString(),
+              size -> new DataStorageSpec.LongBytesBound(size).toBytes());
+    }
+
+    @Override
+    protected int warnThreshold()
+    {
+        return WARN_THRESHOLD;
+    }
+
+    @Override
+    protected int failThreshold()
+    {
+        return FAIL_THRESHOLD;
+    }
+
+    @Test
+    public void testCompositePartitionKey() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k1 int, k2 text, v int, PRIMARY KEY((k1, 
k2)))");
+        createIndex("CREATE INDEX ON %s (k2) USING 'sai'");
+
+        testThreshold("k2", "INSERT INTO %s (k1, k2, v) VALUES (0, ?, 0)");
+        testThreshold("k2", "UPDATE %s SET v = 1 WHERE k1 = 0 AND k2 = ?");
+    }
+
+    @Test
+    public void testSimpleClustering() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, c text, v int, PRIMARY KEY(k, 
c))");
+        createIndex("CREATE INDEX ON %s (c) USING 'sai'");
+
+        testThreshold("c", "INSERT INTO %s (k, c, v) VALUES (0, ?, 0)");
+        testThreshold("c", "UPDATE %s SET v = 1 WHERE k = 0 AND c = ?");
+    }
+
+    @Test
+    public void testRegularColumn() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v text)");
+        createIndex("CREATE INDEX ON %s (v) USING 'sai'");
+
+        testThreshold("v", "INSERT INTO %s (k, v) VALUES (0, ?)");
+        testThreshold("v", "UPDATE %s SET v = ? WHERE k = 0");
+    }
+
+    @Test
+    public void testStaticColumn() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, c int, s text STATIC, r int, 
PRIMARY KEY(k, c))");
+        createIndex("CREATE INDEX ON %s (s) USING 'sai'");
+
+        testThreshold("s", "INSERT INTO %s (k, s) VALUES (0, ?)");
+        testThreshold("s", "INSERT INTO %s (k, c, s, r) VALUES (0, 0, ?, 0)");
+        testThreshold("s", "UPDATE %s SET s = ? WHERE k = 0");
+        testThreshold("s", "UPDATE %s SET s = ?, r = 0 WHERE k = 0 AND c = 0");
+    }
+
+    @Test
+    public void testList() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, l list<text>)");
+        createIndex("CREATE INDEX ON %s (l) USING 'sai'");
+
+        for (String query : Arrays.asList("INSERT INTO %s (k, l) VALUES (0, 
?)",
+                                          "UPDATE %s SET l = ? WHERE k = 0",
+                                          "UPDATE %s SET l = l + ? WHERE k = 
0"))
+        {
+            testCollection("l", query, this::list);
+        }
+
+        testThreshold("l", "UPDATE %s SET l[0] = ? WHERE k = 0");
+
+        String query = "UPDATE %s SET l = l - ? WHERE k = 0";
+        assertValid(query, this::list, allocate(1));
+        assertValid(query, this::list, allocate(FAIL_THRESHOLD));
+        assertValid(query, this::list, allocate(FAIL_THRESHOLD + 1)); // 
Doesn't write anything because we couldn't write
+    }
+
+    @Test
+    public void testBatch() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text, c text, r text, s text STATIC, 
PRIMARY KEY(k, c))");
+        createIndex("CREATE INDEX ON %s (s) USING 'sai'");
+        createIndex("CREATE INDEX ON %s (r) USING 'sai'");
+
+        // static column
+        testThreshold("s", "BEGIN BATCH INSERT INTO %s (k, s) VALUES ('0', ?); 
APPLY BATCH;");
+        testThreshold("s", "BEGIN BATCH INSERT INTO %s (k, s, c, r) VALUES 
('0', ?, '0', '0'); APPLY BATCH;");
+        testThreshold("s", "BEGIN BATCH UPDATE %s SET s = ? WHERE k = '0'; 
APPLY BATCH;");
+        testThreshold("s", "BEGIN BATCH UPDATE %s SET s = ?, r = '0' WHERE k = 
'0' AND c = '0'; APPLY BATCH;");
+
+        // regular column
+        testThreshold("r", "BEGIN BATCH INSERT INTO %s (k, c, r) VALUES ('0', 
'0', ?); APPLY BATCH;");
+        testThreshold("r", "BEGIN BATCH UPDATE %s SET r = ? WHERE k = '0' AND 
c = '0'; APPLY BATCH;");
+    }
+
+    @Test
+    public void testCASWithIfNotExistsCondition() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text, c text, v text, s text STATIC, 
PRIMARY KEY(k, c))");
+        createIndex("CREATE INDEX ON %s (s) USING 'sai'");
+        createIndex("CREATE INDEX ON %s (v) USING 'sai'");
+
+        // static column
+        assertValid("INSERT INTO %s (k, s) VALUES ('1', ?) IF NOT EXISTS", 
allocate(1));
+        assertValid("INSERT INTO %s (k, s) VALUES ('2', ?) IF NOT EXISTS", 
allocate(WARN_THRESHOLD));
+        assertValid("INSERT INTO %s (k, s) VALUES ('2', ?) IF NOT EXISTS", 
allocate(WARN_THRESHOLD + 1)); // not applied
+        assertWarns("s", "INSERT INTO %s (k, s) VALUES ('3', ?) IF NOT 
EXISTS", allocate(WARN_THRESHOLD + 1));
+
+        // regular column
+        assertValid("INSERT INTO %s (k, c, v) VALUES ('4', '0', ?) IF NOT 
EXISTS", allocate(1));
+        assertValid("INSERT INTO %s (k, c, v) VALUES ('5', '0', ?) IF NOT 
EXISTS", allocate(WARN_THRESHOLD));
+        assertValid("INSERT INTO %s (k, c, v) VALUES ('5', '0', ?) IF NOT 
EXISTS", allocate(WARN_THRESHOLD + 1)); // not applied
+        assertWarns("v", "INSERT INTO %s (k, c, v) VALUES ('6', '0', ?) IF NOT 
EXISTS", allocate(WARN_THRESHOLD + 1));
+    }
+
+    @Test
+    public void testSelect() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text, c text, r text, s text STATIC, 
PRIMARY KEY(k, c))");
+        createIndex("CREATE INDEX ON %s (c) USING 'sai'");
+        createIndex("CREATE INDEX ON %s (r) USING 'sai'");
+        createIndex("CREATE INDEX ON %s (s) USING 'sai'");
+
+        // the guardail is only checked for writes; reads are excluded
+        testNoThreshold("SELECT * FROM %s WHERE k = ?");
+        testNoThreshold("SELECT * FROM %s WHERE k = '0' AND c = ?");
+        testNoThreshold("SELECT * FROM %s WHERE c = ? ALLOW FILTERING");
+        testNoThreshold("SELECT * FROM %s WHERE s = ? ALLOW FILTERING");
+        testNoThreshold("SELECT * FROM %s WHERE r = ? ALLOW FILTERING");
+    }
+
+    @Test
+    public void testWarningTermOnBuild()
+    {
+        ByteBuffer largeTerm = allocate(warnThreshold() + 1);
+        ByteBuffer smallTerm = allocate(1);
+
+        createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v text)");
+        execute("INSERT INTO %s (k, v) VALUES (0, ?)", largeTerm);
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", smallTerm);
+        createIndex("CREATE INDEX ON %s(v) USING 'sai'");
+
+        // verify that the large term is written on initial index build
+        assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE v = 
?", largeTerm)).result.size(), 1);
+        assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE v = 
?", smallTerm)).result.size(), 1);
+    }
+
+    @Test
+    public void testFailingTermOnBuild()
+    {
+        ByteBuffer oversizedTerm = allocate(failThreshold() + 1);
+        ByteBuffer smallTerm = allocate(1);
+
+        createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v text)");
+        execute("INSERT INTO %s (k, v) VALUES (0, ?)", oversizedTerm);
+        execute("INSERT INTO %s (k, v) VALUES (1, ?)", smallTerm);
+        createIndex("CREATE INDEX ON %s(v) USING 'sai'");
+
+        // verify that the oversized term isn't written on initial index build
+        assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE v = 
?", oversizedTerm)).result.size(), 0);
+        assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s WHERE v = 
?", smallTerm)).result.size(), 1);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiVectorTermSizeTest.java
 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiVectorTermSizeTest.java
new file mode 100644
index 0000000000..6269f4d98c
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailSaiVectorTermSizeTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.primitives.Floats;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.db.marshal.VectorType;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static 
org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.BYTES;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the guardrails around the size of SAI vector terms
+ *
+ * @see Guardrails#saiVectorTermSize
+ */
+public class GuardrailSaiVectorTermSizeTest extends ValueThresholdTester
+{
+    private static final int WARN_THRESHOLD = 1024; // bytes
+    private static final int FAIL_THRESHOLD = WARN_THRESHOLD * 4; // bytes
+
+    public GuardrailSaiVectorTermSizeTest()
+    {
+        super(WARN_THRESHOLD + "B",
+              FAIL_THRESHOLD + "B",
+              Guardrails.saiVectorTermSize,
+              Guardrails::setSaiVectorTermSizeThreshold,
+              Guardrails::getSaiVectorTermSizeWarnThreshold,
+              Guardrails::getSaiVectorTermSizeFailThreshold,
+              bytes -> new DataStorageSpec.LongBytesBound(bytes, 
BYTES).toString(),
+              size -> new DataStorageSpec.LongBytesBound(size).toBytes());
+    }
+
+    @Override
+    protected int warnThreshold()
+    {
+        return WARN_THRESHOLD;
+    }
+
+    @Override
+    protected int failThreshold()
+    {
+        return FAIL_THRESHOLD;
+    }
+
+    @Test
+    public void testWarn() throws Throwable
+    {
+        int warnDimensions = warnThreshold() / 4; // 4 bytes per dimension
+        List<Float> warnVector = Floats.asList(new float[warnDimensions + 1]);
+
+        createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v 
vector<float, " + warnVector.size() + ">)");
+        createIndex("CREATE INDEX ON %s(v) USING 'sai' WITH OPTIONS = 
{'similarity_function' : 'euclidean'}");
+
+        VectorType<Float> vectorType = 
VectorType.getInstance(FloatType.instance, warnDimensions + 1);
+        assertWarns(() -> execute("INSERT INTO %s (k, v) VALUES (0, ?)", 
vectorType.decompose(warnVector)),
+                    "Value of column 'v' has size");
+    }
+
+    @Test
+    public void testFail() throws Throwable
+    {
+        int failDimensions = failThreshold() / 4; // 4 bytes per dimension
+        List<Float> failVector = Floats.asList(new float[failDimensions + 1]);
+
+        createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v 
vector<float, " + failVector.size() + ">)");
+        createIndex("CREATE INDEX ON %s(v) USING 'sai' WITH OPTIONS = 
{'similarity_function' : 'euclidean'}");
+
+        VectorType<Float> vectorType = 
VectorType.getInstance(FloatType.instance, failDimensions + 1);
+        assertFails(() -> execute("INSERT INTO %s (k, v) VALUES (0, ?)", 
vectorType.decompose(failVector)),
+                    "Value of column 'v' has size");
+    }
+
+    @Test
+    public void testWarningVectorOnBuild()
+    {
+        int warnDimensions = warnThreshold() / 4; // 4 bytes per dimension
+        List<Float> largeVector = Floats.asList(new float[warnDimensions + 1]);
+
+        createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v 
vector<float, " + largeVector.size() + ">)");
+
+        VectorType<Float> vectorType = 
VectorType.getInstance(FloatType.instance, warnDimensions + 1);
+        ByteBuffer vectorBytes = vectorType.decompose(largeVector);
+        execute("INSERT INTO %s (k, v) VALUES (0, ?)", vectorBytes);
+
+        createIndex("CREATE INDEX ON %s(v) USING 'sai' WITH OPTIONS = 
{'similarity_function' : 'euclidean'}");
+
+        // verify that the large vector is written on initial index build
+        assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s ORDER BY 
v ANN OF ? LIMIT 10", vectorBytes)).result.size(), 1);
+    }
+
+    @Test
+    public void testFailingVectorOnBuild()
+    {
+        int failDimensions = failThreshold() / 4; // 4 bytes per dimension
+        List<Float> oversizedVector = Floats.asList(new float[failDimensions + 
1]);
+
+        createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, v 
vector<float, " + oversizedVector.size() + ">)");
+
+        VectorType<Float> vectorType = 
VectorType.getInstance(FloatType.instance, failDimensions + 1);
+        ByteBuffer vectorBytes = vectorType.decompose(oversizedVector);
+        execute("INSERT INTO %s (k, v) VALUES (0, ?)", vectorBytes);
+        
+        createIndex("CREATE INDEX ON %s(v) USING 'sai' WITH OPTIONS = 
{'similarity_function' : 'euclidean'}");
+
+        // verify that the oversized vector isn't written on initial index 
build
+        assertEquals(((ResultMessage.Rows) execute("SELECT k, v FROM 
%s")).result.size(), 1);
+        assertEquals(((ResultMessage.Rows) execute("SELECT * FROM %s ORDER BY 
v ANN OF ? LIMIT 10", vectorBytes)).result.size(), 0);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/ValueThresholdTester.java 
b/test/unit/org/apache/cassandra/db/guardrails/ValueThresholdTester.java
new file mode 100644
index 0000000000..95cc599796
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/guardrails/ValueThresholdTester.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.ToLongFunction;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static java.lang.String.format;
+import static java.nio.ByteBuffer.allocate;
+
+public abstract class ValueThresholdTester extends ThresholdTester
+{
+    protected ValueThresholdTester(String warnThreshold,
+                                   String failThreshold,
+                                   Threshold threshold,
+                                   TriConsumer<Guardrails, String, String> 
setter,
+                                   Function<Guardrails, String> warnGetter,
+                                   Function<Guardrails, String> failGetter,
+                                   Function<Long, String> stringFormatter,
+                                   ToLongFunction<String> stringParser)
+    {
+        super(warnThreshold,
+              failThreshold,
+              threshold,
+              setter,
+              warnGetter,
+              failGetter,
+              stringFormatter,
+              stringParser);
+    }
+
+    protected abstract int warnThreshold();
+
+    protected abstract int failThreshold();
+
+    /**
+     * Tests that the max column size guardrail threshold is not applied for 
the specified 1-placeholder CQL query.
+     *
+     * @param query a CQL modification statement with exactly one placeholder
+     */
+    protected void testNoThreshold(String query) throws Throwable
+    {
+        assertValid(query, allocate(1));
+
+        assertValid(query, allocate(warnThreshold()));
+        assertValid(query, allocate(warnThreshold() + 1));
+
+        assertValid(query, allocate(failThreshold()));
+        assertValid(query, allocate(failThreshold() + 1));
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is not applied for 
the specified 2-placeholder CQL query.
+     *
+     * @param query a CQL modification statement with exactly two placeholders
+     */
+    protected void testNoThreshold2(String query) throws Throwable
+    {
+        assertValid(query, allocate(1), allocate(1));
+
+        assertValid(query, allocate(warnThreshold()), allocate(1));
+        assertValid(query, allocate(1), allocate(warnThreshold()));
+        assertValid(query, allocate((warnThreshold())), 
allocate((warnThreshold())));
+        assertValid(query, allocate(warnThreshold() + 1), allocate(1));
+        assertValid(query, allocate(1), allocate(warnThreshold() + 1));
+
+        assertValid(query, allocate(failThreshold()), allocate(1));
+        assertValid(query, allocate(1), allocate(failThreshold()));
+        assertValid(query, allocate((failThreshold())), 
allocate((failThreshold())));
+        assertValid(query, allocate(failThreshold() + 1), allocate(1));
+        assertValid(query, allocate(1), allocate(failThreshold() + 1));
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is applied for the 
specified 1-placeholder CQL query.
+     *
+     * @param column the name of the column referenced by the query placeholder
+     * @param query  a CQL query with exactly one placeholder
+     */
+    protected void testThreshold(String column, String query) throws Throwable
+    {
+        testThreshold(column, query, 0);
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is applied for the 
specified 1-placeholder CQL query.
+     *
+     * @param column             the name of the column referenced by the 
query placeholder
+     * @param query              a CQL query with exactly one placeholder
+     * @param serializationBytes the extra bytes added to the placeholder 
value by its wrapping column type serializer
+     */
+    protected void testThreshold(String column, String query, int 
serializationBytes) throws Throwable
+    {
+        int warn = warnThreshold() - serializationBytes;
+        int fail = failThreshold() - serializationBytes;
+
+        assertValid(query, allocate(0));
+        assertValid(query, allocate(warn));
+        assertWarns(column, query, allocate(warn + 1));
+        assertFails(column, query, allocate(fail + 1));
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is applied for the 
specified 2-placeholder CQL query.
+     *
+     * @param column the name of the column referenced by the placeholders
+     * @param query  a CQL query with exactly two placeholders
+     */
+    protected void testThreshold2(String column, String query) throws Throwable
+    {
+        testThreshold2(column, query, 0);
+    }
+
+    /**
+     * Tests that the max column size guardrail threshold is applied for the 
specified 2-placeholder query.
+     *
+     * @param column             the name of the column referenced by the 
placeholders
+     * @param query              a CQL query with exactly two placeholders
+     * @param serializationBytes the extra bytes added to the size of the 
placeholder value by their wrapping serializer
+     */
+    protected void testThreshold2(String column, String query, int 
serializationBytes) throws Throwable
+    {
+        int warn = warnThreshold() - serializationBytes;
+        int fail = failThreshold() - serializationBytes;
+
+        assertValid(query, allocate(0), allocate(0));
+        assertValid(query, allocate(warn), allocate(0));
+        assertValid(query, allocate(0), allocate(warn));
+        assertValid(query, allocate(warn / 2), allocate(warn / 2));
+
+        assertWarns(column, query, allocate(warn + 1), allocate(0));
+        assertWarns(column, query, allocate(0), allocate(warn + 1));
+
+        assertFails(column, query, allocate(fail + 1), allocate(0));
+        assertFails(column, query, allocate(0), allocate(fail + 1));
+    }
+
+    protected void testCollection(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder) throws Throwable
+    {
+        assertValid(query, collectionBuilder, allocate(1));
+        assertValid(query, collectionBuilder, allocate(1), allocate(1));
+        assertValid(query, collectionBuilder, allocate(warnThreshold()));
+        assertValid(query, collectionBuilder, allocate(warnThreshold()), 
allocate(1));
+        assertValid(query, collectionBuilder, allocate(1), 
allocate(warnThreshold()));
+        assertValid(query, collectionBuilder, allocate(warnThreshold()), 
allocate(warnThreshold()));
+
+        assertWarns(column, query, collectionBuilder, allocate(warnThreshold() 
+ 1));
+        assertWarns(column, query, collectionBuilder, allocate(warnThreshold() 
+ 1), allocate(1));
+        assertWarns(column, query, collectionBuilder, allocate(1), 
allocate(warnThreshold() + 1));
+
+        assertFails(column, query, collectionBuilder, allocate(failThreshold() 
+ 1));
+        assertFails(column, query, collectionBuilder, allocate(failThreshold() 
+ 1), allocate(1));
+        assertFails(column, query, collectionBuilder, allocate(1), 
allocate(failThreshold() + 1));
+    }
+
+    protected void testFrozenCollection(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder) throws Throwable
+    {
+        assertValid(query, collectionBuilder, allocate(1));
+        assertValid(query, collectionBuilder, allocate(warnThreshold() - 8));
+        assertValid(query, collectionBuilder, allocate((warnThreshold() - 12) 
/ 2), allocate((warnThreshold() - 12) / 2));
+
+        assertWarns(column, query, collectionBuilder, allocate(warnThreshold() 
- 7));
+        assertWarns(column, query, collectionBuilder, allocate(warnThreshold() 
- 12), allocate(1));
+
+        assertFails(column, query, collectionBuilder, allocate(failThreshold() 
- 7));
+        assertFails(column, query, collectionBuilder, allocate(failThreshold() 
- 12), allocate(1));
+    }
+
+    protected void testMap(String column, String query) throws Throwable
+    {
+        assertValid(query, this::map, allocate(1), allocate(1));
+        assertValid(query, this::map, allocate(warnThreshold()), allocate(1));
+        assertValid(query, this::map, allocate(1), allocate(warnThreshold()));
+        assertValid(query, this::map, allocate(warnThreshold()), 
allocate(warnThreshold()));
+
+        assertWarns(column, query, this::map, allocate(1), 
allocate(warnThreshold() + 1));
+        assertWarns(column, query, this::map, allocate(warnThreshold() + 1), 
allocate(1));
+
+        assertFails(column, query, this::map, allocate(failThreshold() + 1), 
allocate(1));
+        assertFails(column, query, this::map, allocate(1), 
allocate(failThreshold() + 1));
+        assertFails(column, query, this::map, allocate(failThreshold() + 1), 
allocate(failThreshold() + 1));
+    }
+
+    protected void assertValid(String query, ByteBuffer... values) throws 
Throwable
+    {
+        assertValid(() -> execute(query, values));
+    }
+
+    protected void assertValid(String query, Function<ByteBuffer[], 
ByteBuffer> collectionBuilder, ByteBuffer... values) throws Throwable
+    {
+        assertValid(() -> execute(query, collectionBuilder.apply(values)));
+    }
+
+    protected void assertWarns(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) 
throws Throwable
+    {
+        assertWarns(column, query, collectionBuilder.apply(values));
+    }
+
+    protected void assertWarns(String column, String query, ByteBuffer... 
values) throws Throwable
+    {
+        String errorMessage = format("Value of column '%s' has size %s, this 
exceeds the warning threshold of %s.",
+                                     column, warnThreshold() + 1, 
warnThreshold());
+        assertWarns(() -> execute(query, values), errorMessage);
+    }
+
+    protected void assertFails(String column, String query, 
Function<ByteBuffer[], ByteBuffer> collectionBuilder, ByteBuffer... values) 
throws Throwable
+    {
+        assertFails(column, query, collectionBuilder.apply(values));
+    }
+
+    protected void assertFails(String column, String query, ByteBuffer... 
values) throws Throwable
+    {
+        String errorMessage = format("Value of column '%s' has size %s, this 
exceeds the failure threshold of %s.",
+                                     column, failThreshold() + 1, 
failThreshold());
+        assertFails(() -> execute(query, values), errorMessage);
+    }
+
+    protected ResultMessage execute(String query, ByteBuffer... values)
+    {
+        return execute(userClientState, query, Arrays.asList(values));
+    }
+
+    protected ByteBuffer set(ByteBuffer... values)
+    {
+        return SetType.getInstance(BytesType.instance, 
true).decompose(ImmutableSet.copyOf(values));
+    }
+
+    protected ByteBuffer list(ByteBuffer... values)
+    {
+        return ListType.getInstance(BytesType.instance, 
true).decompose(ImmutableList.copyOf(values));
+    }
+
+    protected ByteBuffer map(ByteBuffer... values)
+    {
+        assert values.length % 2 == 0;
+
+        int size = values.length / 2;
+        Map<ByteBuffer, ByteBuffer> m = new LinkedHashMap<>(size);
+        for (int i = 0; i < size; i++)
+            m.put(values[2 * i], values[(2 * i) + 1]);
+
+        return MapType.getInstance(BytesType.instance, BytesType.instance, 
true).decompose(m);
+    }
+}
diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java 
b/test/unit/org/apache/cassandra/index/StubIndex.java
index c497c79049..cfaff698ec 100644
--- a/test/unit/org/apache/cassandra/index/StubIndex.java
+++ b/test/unit/org/apache/cassandra/index/StubIndex.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -200,7 +201,8 @@ public class StubIndex implements Index
         return 0;
     }
 
-    public void validate(PartitionUpdate update) throws InvalidRequestException
+    @Override
+    public void validate(PartitionUpdate update, ClientState state) throws 
InvalidRequestException
     {
 
     }
diff --git 
a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java 
b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index f67f2666b9..672777cc25 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -56,6 +56,7 @@ import 
org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -224,7 +225,8 @@ public class CustomCassandraIndex implements Index
         return null;
     }
 
-    public void validate(PartitionUpdate update) throws InvalidRequestException
+    @Override
+    public void validate(PartitionUpdate update, ClientState state) throws 
InvalidRequestException
     {
         switch (indexedColumn.kind)
         {
diff --git 
a/test/unit/org/apache/cassandra/index/sai/cql/AllTypesSimpleEqTest.java 
b/test/unit/org/apache/cassandra/index/sai/cql/AllTypesSimpleEqTest.java
index 55390b3f5c..91bff0de97 100644
--- a/test/unit/org/apache/cassandra/index/sai/cql/AllTypesSimpleEqTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/cql/AllTypesSimpleEqTest.java
@@ -33,7 +33,6 @@ import accord.utils.Gens;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.DecimalType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.index.sai.StorageAttachedIndex;
 import org.apache.cassandra.utils.AbstractTypeGenerators;
 import org.apache.cassandra.utils.Generators;
@@ -42,10 +41,8 @@ import org.quicktheories.generators.SourceDSL;
 @RunWith(Parameterized.class)
 public class AllTypesSimpleEqTest extends AbstractSimpleEqTestBase
 {
-    private static final Map<AbstractType<?>, Long> LARGE_DOMAIN_FAILING_SEEDS 
=
-        Map.of(UTF8Type.instance, -4379508235061872764L);
-    private static final Map<AbstractType<?>, Long> SHORT_DOMAIN_FAILING_SEEDS 
=
-        Map.of(UTF8Type.instance, -4379508235061872764L);
+    private static final Map<AbstractType<?>, Long> LARGE_DOMAIN_FAILING_SEEDS 
= Map.of();
+    private static final Map<AbstractType<?>, Long> SHORT_DOMAIN_FAILING_SEEDS 
= Map.of();
 
     private final AbstractType<?> type;
 
@@ -59,8 +56,6 @@ public class AllTypesSimpleEqTest extends 
AbstractSimpleEqTestBase
     {
         return StorageAttachedIndex.SUPPORTED_TYPES.stream()
                                                    .map(CQL3Type::getType)
-                                                   // TODO: Track down unicode 
character edge cases...
-                                                   .filter(t -> t != 
UTF8Type.instance)
                                                    .distinct()
                                                    .map(t -> new Object[]{ t })
                                                    
.collect(Collectors.toList());
diff --git 
a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java 
b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java
index b7e61e3d7e..cc2e2f4b13 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java
@@ -21,7 +21,6 @@
 package org.apache.cassandra.index.sai.cql;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.Iterator;
@@ -41,7 +40,6 @@ import com.datastax.driver.core.ResultSet;
 import 
com.datastax.driver.core.exceptions.InvalidConfigurationInQueryException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.ReadFailureException;
-import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -78,7 +76,6 @@ import org.apache.cassandra.inject.InvokePointBuilder;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Throwables;
 import org.assertj.core.api.Assertions;
 import org.mockito.Mockito;
@@ -633,50 +630,6 @@ public class StorageAttachedIndexDDLTest extends SAITester
         assertThatThrownBy(() -> executeNet("SELECT id1 FROM %s WHERE 
v1>=0")).isInstanceOf(ReadFailureException.class);
     }
 
-    @Test
-    public void testMaxTermSize() throws Throwable
-    {
-        String largeTerm = 
UTF8Type.instance.compose(ByteBuffer.allocate(FBUtilities.MAX_UNSIGNED_SHORT / 
2 + 1));
-        int maxFloatVectorDimensions = (int) 
(CassandraRelevantProperties.SAI_MAX_VECTOR_TERM_SIZE.getSizeInBytes() / 4); // 
4 bytes per dimension
-        Vector<Float> largeVector = vector(new float[maxFloatVectorDimensions 
+ 1]);
-
-        createTable(KEYSPACE, "CREATE TABLE %s (k int PRIMARY KEY, r text, m 
map<text, text>, v vector<float, " + largeVector.size() + ">)");
-        createIndex("CREATE CUSTOM INDEX ON %s(r) USING 
'StorageAttachedIndex'");
-        createIndex("CREATE CUSTOM INDEX ON %s(ENTRIES(m)) USING 
'StorageAttachedIndex'");
-        createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex' 
WITH OPTIONS = {'similarity_function' : 'euclidean'}");
-
-        // verify that a write exceeding max term size is accepted with client 
warnings
-        ResultSet resultSet = executeNet("INSERT INTO %s (k, r, m, v) VALUES 
(0, ?, {'" + largeTerm + "': ''}, " + largeVector + ')', largeTerm);
-        List<String> warnings = resultSet.getExecutionInfo().getWarnings();
-        warnings.sort(String::compareTo);
-        assertEquals(3, warnings.size());
-        assertTrue(warnings.get(0).contains("Can't add term of column m"));
-        assertTrue(warnings.get(1).contains("Can't add term of column r"));
-        assertTrue(warnings.get(2).contains("Can't add term of column v"));
-
-        // verify that the large terms aren't written into the memtable indexes
-        assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, 
map(largeTerm, ""), largeVector));
-        assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm));
-        assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm));
-        assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", 
largeVector));
-
-        // verify that the large terms aren't written into the sstable indexes 
after flush
-        flush();
-        assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, 
map(largeTerm, ""), largeVector));
-        assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm));
-        assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm));
-        assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", 
largeVector));
-
-        // verify that the large terms aren't written into the sstable indexes 
after compactions
-        executeNet("INSERT INTO %s (k, r, m, v) VALUES (0, ?, {'" + largeTerm 
+ "': ''}, " + largeVector + ')', largeTerm);
-        flush();
-        compact();
-        assertRows(execute("SELECT k, r, m, v FROM %s"), row(0, largeTerm, 
map(largeTerm, ""), largeVector));
-        assertEmpty(execute("SELECT * FROM %s WHERE r = ?", largeTerm));
-        assertEmpty(execute("SELECT * FROM %s WHERE m[?] = ''", largeTerm));
-        assertEmpty(execute("SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 10", 
largeVector));
-    }
-
     @Test
     public void shouldReleaseIndexFilesAfterDroppingLastIndex() throws 
Throwable
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to