This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 83203a14c400ff99cfb2a5b7e655a663ea882c2b
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Fri Jul 14 01:44:26 2023 -0700

    Importer should build SSTable indexes successfully before making new 
SSTables readable
    
    - Avoid validation in response to SSTableAddedNotification, as it should 
already have been done somewhere else
    - Change SSTableWriter to prevent commit when a failure is thrown out of an 
index build
    
    patch by Caleb Rackliffe; reviewed by Mike Adamson and Andres de la Peña 
for CASSANDRA-18670
---
 .../cassandra/config/DatabaseDescriptor.java       |   6 +
 .../org/apache/cassandra/db/SSTableImporter.java   |   5 +
 .../db/streaming/CassandraIncomingFile.java        |   8 +
 .../db/streaming/CassandraStreamReceiver.java      |  16 +-
 src/java/org/apache/cassandra/index/Index.java     |  38 ++-
 .../org/apache/cassandra/index/IndexRegistry.java  |   8 +-
 .../cassandra/index/SecondaryIndexManager.java     | 111 +++++++-
 .../apache/cassandra/index/sai/IndexContext.java   |   1 -
 .../cassandra/index/sai/SSTableContextManager.java |   1 -
 .../cassandra/index/sai/StorageAttachedIndex.java  |   6 +
 .../index/sai/StorageAttachedIndexBuilder.java     |   8 +-
 .../index/sai/StorageAttachedIndexGroup.java       |  42 ++-
 .../index/sai/disk/StorageAttachedIndexWriter.java |  24 +-
 .../index/sai/disk/format/IndexDescriptor.java     |  42 ++-
 .../index/sai/disk/format/OnDiskFormat.java        |  11 +-
 .../index/sai/disk/v1/V1OnDiskFormat.java          |  51 ++--
 .../org/apache/cassandra/index/sasi/SASIIndex.java |   1 +
 .../cassandra/io/sstable/format/SSTableWriter.java |  11 +-
 .../test/sai/ImportIndexedSSTablesTest.java        | 283 +++++++++++++++++++++
 .../test/sai/IndexStreamingFailureTest.java        | 199 +++++++++++++++
 .../org/apache/cassandra/index/sai/SAITester.java  |   2 +-
 .../index/sai/cql/StorageAttachedIndexDDLTest.java |  31 ++-
 .../sai/metrics/SegmentFlushingFailureTester.java  |  34 ++-
 23 files changed, 826 insertions(+), 113 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index f288baefbf..4e5a26fafc 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3552,6 +3552,12 @@ public class DatabaseDescriptor
         return conf.stream_entire_sstables;
     }
 
+    @VisibleForTesting
+    public static boolean setStreamEntireSSTables(boolean value)
+    {
+        return conf.stream_entire_sstables = value;
+    }
+
     public static DurationSpec.LongMillisecondsBound 
getStreamTransferTaskTimeout()
     {
         return conf.stream_transfer_task_timeout;
diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java 
b/src/java/org/apache/cassandra/db/SSTableImporter.java
index b7cf3b7718..c2544f7d06 100644
--- a/src/java/org/apache/cassandra/db/SSTableImporter.java
+++ b/src/java/org/apache/cassandra/db/SSTableImporter.java
@@ -186,6 +186,11 @@ public class SSTableImporter
         try (Refs<SSTableReader> refs = Refs.ref(newSSTables))
         {
             abortIfDraining();
+
+            // Validate existing SSTable-attached indexes, and then build any 
that are missing:
+            if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, 
false))
+                
cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables);
+
             cfs.getTracker().addSSTables(newSSTables);
             for (SSTableReader reader : newSSTables)
             {
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index 3e87d67fef..e8a6fbcc7c 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -47,6 +47,8 @@ public class CassandraIncomingFile implements IncomingStream
     private volatile long size = -1;
     private volatile int numFiles = 1;
 
+    private volatile boolean isEntireSSTable = false;
+
     private static final Logger logger = 
LoggerFactory.getLogger(CassandraIncomingFile.class);
 
     public CassandraIncomingFile(ColumnFamilyStore cfs, StreamSession session, 
StreamMessageHeader header)
@@ -72,6 +74,7 @@ public class CassandraIncomingFile implements IncomingStream
         IStreamReader reader;
         if (streamHeader.isEntireSSTable)
         {
+            isEntireSSTable = true;
             reader = new CassandraEntireSSTableStreamReader(header, 
streamHeader, session);
             numFiles = streamHeader.componentManifest.components().size();
         }
@@ -103,6 +106,11 @@ public class CassandraIncomingFile implements 
IncomingStream
         return numFiles;
     }
 
+    public boolean isEntireSSTable()
+    {
+        return isEntireSSTable;
+    }
+    
     @Override
     public TableId getTableId()
     {
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
index 4b5de8b8c9..5721d0b2e4 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
@@ -68,7 +68,9 @@ public class CassandraStreamReceiver implements StreamReceiver
     private final LifecycleTransaction txn;
 
     //  holds references to SSTables received
-    protected Collection<SSTableReader> sstables;
+    protected final Collection<SSTableReader> sstables;
+
+    protected volatile boolean receivedEntireSSTable;
 
     private final boolean requiresWritePath;
 
@@ -114,6 +116,7 @@ public class CassandraStreamReceiver implements 
StreamReceiver
         }
         txn.update(finished, false);
         sstables.addAll(finished);
+        receivedEntireSSTable = file.isEntireSSTable();
     }
 
     @Override
@@ -223,7 +226,7 @@ public class CassandraStreamReceiver implements 
StreamReceiver
         }
     }
 
-    public synchronized  void finishTransaction()
+    public synchronized void finishTransaction()
     {
         txn.finish();
     }
@@ -242,9 +245,16 @@ public class CassandraStreamReceiver implements 
StreamReceiver
             }
             else
             {
+                // Validate SSTable-attached indexes that should have streamed 
in an already complete state. When we
+                // don't stream the entire SSTable, validation is unnecessary, 
as the indexes have just been written
+                // via the SSTable flush observer, and an error there would 
have aborted the streaming transaction.
+                if (receivedEntireSSTable)
+                    // If we do validate, any exception thrown doing so will 
also abort the streaming transaction:
+                    cfs.indexManager.validateSSTableAttachedIndexes(readers, 
true);
+
                 finishTransaction();
 
-                // add sstables (this will build secondary indexes too, see 
CASSANDRA-10130)
+                // add sstables (this will build non-SSTable-attached 
secondary indexes too, see CASSANDRA-10130)
                 logger.debug("[Stream #{}] Received {} sstables from {} ({})", 
session.planId(), readers.size(), session.peer, readers);
                 cfs.addSSTables(readers);
 
diff --git a/src/java/org/apache/cassandra/index/Index.java 
b/src/java/org/apache/cassandra/index/Index.java
index 5ca9e719a2..f116fdb3e0 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -20,6 +20,7 @@
  */
 package org.apache.cassandra.index;
 
+import java.io.UncheckedIOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
@@ -61,7 +62,6 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 
-
 /**
  * Consisting of a top level Index interface and two sub-interfaces which 
handle read and write operations,
  * Searcher and Indexer respectively, this defines a secondary index 
implementation.
@@ -341,13 +341,28 @@ public interface Index
      * Return true if this index can be built or rebuilt when the index 
manager determines it is necessary. Returning
      * false enables the index implementation (or some other component) to 
control if and when SSTable data is
      * incorporated into the index.
-     *
+     * <p>
      * This is called by SecondaryIndexManager in buildIndexBlocking, 
buildAllIndexesBlocking and rebuildIndexesBlocking
      * where a return value of false causes the index to be exluded from the 
set of those which will process the
      * SSTable data.
      * @return if the index should be included in the set which processes 
SSTable data, false otherwise.
      */
-    public boolean shouldBuildBlocking();
+    boolean shouldBuildBlocking();
+
+    /**
+     * For an index to qualify as SSTable-attached, it must do two things:
+     * <p>
+     * 1.) It must use {@link SSTableFlushObserver} to incrementally build 
indexes as SSTables are written. This ensures
+     *     that non-entire file streaming builds them correctly before the 
streaming transaction finishes. 
+     * <p> 
+     * 2.) Its implementation of {@link SecondaryIndexBuilder} must support 
incremental building by SSTable.
+     * 
+     * @return true if the index builds SSTable-attached on-disk components
+     */
+    default boolean isSSTableAttached()
+    {
+        return false;
+    }
 
     /**
      * Get flush observer to observe partition/cell events generated by 
flushing SSTable (memtable flush or compaction).
@@ -765,6 +780,23 @@ public interface Index
          * @return the SSTable components created by this group
          */
         Set<Component> getComponents();
+
+        /**
+         * Validates all indexes in the group against the specified SSTables. 
+         * 
+         * @param sstables SSTables for which indexes in the group should be 
built
+         * @param throwOnIncomplete whether to throw an error if any index in 
the group is incomplete
+         * 
+         * @return true if all indexes in the group are complete and valid
+         *         false if any index is incomplete and {@code 
throwOnIncomplete} is false 
+         * 
+         * @throws IllegalStateException if {@code throwOnIncomplete} is true 
and any index in the group is incomplete
+         * @throws UncheckedIOException if there is a problem validating any 
on-disk component of an index in the group
+         */
+        default boolean 
validateSSTableAttachedIndexes(Collection<SSTableReader> sstables, boolean 
throwOnIncomplete)
+        {
+            return true;
+        }
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java 
b/src/java/org/apache/cassandra/index/IndexRegistry.java
index 4d49c693d6..308aeacd7a 100644
--- a/src/java/org/apache/cassandra/index/IndexRegistry.java
+++ b/src/java/org/apache/cassandra/index/IndexRegistry.java
@@ -61,7 +61,7 @@ public interface IndexRegistry
     /**
      * An empty {@code IndexRegistry}
      */
-    public static final IndexRegistry EMPTY = new IndexRegistry()
+    IndexRegistry EMPTY = new IndexRegistry()
     {
         @Override
         public void registerIndex(Index index, Object groupKey, 
Supplier<Index.Group> groupSupplier)
@@ -104,7 +104,7 @@ public interface IndexRegistry
      * but enables query validation and preparation to succeed. Useful for 
tools which need to prepare
      * CQL statements without instantiating the whole ColumnFamilyStore 
infrastructure.
      */
-    public static final IndexRegistry NON_DAEMON = new IndexRegistry()
+    IndexRegistry NON_DAEMON = new IndexRegistry()
     {
         final Index index = new Index()
         {
@@ -279,7 +279,7 @@ public interface IndexRegistry
     {
         registerIndex(index, index, () -> new SingletonIndexGroup(index));
     }
-    public void registerIndex(Index index, Object groupKey, 
Supplier<Index.Group> groupSupplier);
+    void registerIndex(Index index, Object groupKey, Supplier<Index.Group> 
groupSupplier);
     Collection<Index.Group> listIndexGroups();
 
     Index getIndex(IndexMetadata indexMetadata);
@@ -304,7 +304,7 @@ public interface IndexRegistry
      * @param table the table metadata
      * @return the {@code IndexRegistry} associated to the specified table
      */
-    public static IndexRegistry obtain(TableMetadata table)
+    static IndexRegistry obtain(TableMetadata table)
     {
         if (!DatabaseDescriptor.isDaemonInitialized())
             return NON_DAEMON;
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 08c1a5fa6d..19985c304f 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.index;
 
+import java.io.UncheckedIOException;
 import java.lang.reflect.Constructor;
 import java.util.*;
 import java.util.concurrent.Callable;
@@ -63,7 +64,10 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.Index.IndexBuildingSupport;
 import org.apache.cassandra.index.internal.CassandraIndex;
-import org.apache.cassandra.index.transactions.*;
+import org.apache.cassandra.index.transactions.CleanupTransaction;
+import org.apache.cassandra.index.transactions.CompactionTransaction;
+import org.apache.cassandra.index.transactions.IndexTransaction;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
@@ -492,8 +496,93 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
     }
 
     /**
-     * Performs a blocking (re)indexing/recovery of the specified SSTables for 
the specified indexes.
+     * Validates all index groups against the specified SSTables. 
+     *
+     * @param sstables SSTables for which indexes in the group should be built
+     * @param throwOnIncomplete whether to throw an error if any index in the 
group is incomplete
+     *
+     * @return true if all indexes in all groups are complete and valid
+     *         false if an index in any group is incomplete and {@code 
throwOnIncomplete} is false 
      *
+     * @throws IllegalStateException if {@code throwOnIncomplete} is true and 
an index in any group is incomplete
+     * @throws UncheckedIOException if there is a problem validating any 
on-disk component in any group
+     */
+    public boolean validateSSTableAttachedIndexes(Collection<SSTableReader> 
sstables, boolean throwOnIncomplete)
+    {
+        boolean complete = true;
+
+        for (Index.Group group : indexGroups.values())
+        {
+            if (group.getIndexes().stream().anyMatch(Index::isSSTableAttached))
+                complete &= group.validateSSTableAttachedIndexes(sstables, 
throwOnIncomplete);
+        }
+
+        return complete;
+    }
+
+    /**
+     * Incrementally builds indexes for the specified SSTables in a blocking 
fashion.
+     * <p>
+     * This is similar to {@link #buildIndexesBlocking}, but it is designed to 
be used in cases where failure will
+     * cascade through to failing the containing operation that actuates the 
build. (ex. streaming and SSTable import)
+     * <p>
+     * It does not update index build status or queryablility on failure or 
success and does not call
+     * {@link #flushIndexesBlocking(Set, FutureCallback)}, as this is an 
artifact of the legacy non-SSTable-attached
+     * index implementation.
+     *
+     * @param sstables the SSTables for which indexes must be built
+     */
+    public void buildSSTableAttachedIndexesBlocking(Collection<SSTableReader> 
sstables)
+    {
+        Set<Index> toBuild = 
indexes.values().stream().filter(Index::isSSTableAttached).collect(Collectors.toSet());
+
+        if (toBuild.isEmpty())
+            return;
+
+        logger.info("Submitting incremental index build of {} for data in 
{}...",
+                    commaSeparated(toBuild),
+                    
sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
+
+        // Group all building tasks
+        Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
+        for (Index index : toBuild)
+        {
+            Set<Index> stored = 
byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>());
+            stored.add(index);
+        }
+
+        // Schedule all index building tasks with callbacks to handle success 
and failure
+        List<Future<?>> futures = new ArrayList<>(byType.size());
+        byType.forEach((buildingSupport, groupedIndexes) ->
+        {
+            SecondaryIndexBuilder builder = 
buildingSupport.getIndexBuildTask(baseCfs, groupedIndexes, sstables, false);
+            AsyncPromise<Object> build = new AsyncPromise<>();
+            
CompactionManager.instance.submitIndexBuild(builder).addCallback(new 
FutureCallback<Object>()
+            {
+                @Override
+                public void onFailure(Throwable t)
+                {
+                    logger.warn("Failed to incrementally build indexes {}", 
getIndexNames(groupedIndexes));
+                    build.tryFailure(t);
+                }
+
+                @Override
+                public void onSuccess(Object o)
+                {
+                    logger.info("Incremental index build of {} completed", 
getIndexNames(groupedIndexes));
+                    build.trySuccess(o);
+                }
+            });
+            futures.add(build);
+        });
+
+        // Finally wait for the index builds to finish
+        FBUtilities.waitOnFutures(futures);
+    }
+
+    /**
+     * Performs a blocking (re)indexing/recovery of the specified SSTables for 
the specified indexes.
+     * <p>
      * If the index doesn't support ALL {@link Index.LoadType} it performs a 
recovery {@link Index#getRecoveryTaskSupport()}
      * instead of a build {@link Index#getBuildTaskSupport()}
      * 
@@ -501,7 +590,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
      * @param indexes       the indexes to be (re)built for the specifed 
SSTables
      * @param isFullRebuild True if this method is invoked as a full index 
rebuild, false otherwise
      */
-    @SuppressWarnings({ "unchecked" })
+    @SuppressWarnings({"unchecked", "RedundantSuppression"})
     private void buildIndexesBlocking(Collection<SSTableReader> sstables, 
Set<Index> indexes, boolean isFullRebuild)
     {
         if (indexes.isEmpty())
@@ -542,7 +631,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
                            {
                                SecondaryIndexBuilder builder = 
buildingSupport.getIndexBuildTask(baseCfs, groupedIndexes, sstables, 
isFullRebuild);
                                final AsyncPromise<Object> build = new 
AsyncPromise<>();
-                               
CompactionManager.instance.submitIndexBuild(builder).addCallback(new 
FutureCallback()
+                               
CompactionManager.instance.submitIndexBuild(builder).addCallback(new 
FutureCallback<Object>()
                                {
                                    @Override
                                    public void onFailure(Throwable t)
@@ -584,11 +673,11 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
                 }
 
                 // Flush all built indexes with an aynchronous callback to log 
the success or failure of the flush
-                flushIndexesBlocking(builtIndexes, new FutureCallback()
+                flushIndexesBlocking(builtIndexes, new FutureCallback<>()
                 {
-                    String indexNames = StringUtils.join(builtIndexes.stream()
-                                                                     .map(i -> 
i.getIndexMetadata().name)
-                                                                     
.collect(Collectors.toList()), ',');
+                    final String indexNames = 
StringUtils.join(builtIndexes.stream()
+                                                                           
.map(i -> i.getIndexMetadata().name)
+                                                                           
.collect(Collectors.toList()), ',');
 
                     @Override
                     public void onFailure(Throwable ignored)
@@ -886,7 +975,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
     {
         executeAllBlocking(indexes.values()
                                   .stream()
-                                  .filter(index -> 
!index.getBackingTable().isPresent()),
+                                  .filter(index -> 
index.getBackingTable().isEmpty()),
                            index -> 
index.getBlockingFlushTask(baseCfsMemtable),
                            null);
     }
@@ -1203,7 +1292,6 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
      * implementations
      *
      * @param update PartitionUpdate containing the values to be validated by 
registered Index implementations
-     * @throws InvalidRequestException
      */
     public void validate(PartitionUpdate update) throws InvalidRequestException
     {
@@ -1701,11 +1789,12 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
             SSTableAddedNotification notice = (SSTableAddedNotification) 
notification;
 
             // SSTables asociated to a memtable come from a flush, so their 
contents have already been indexed
-            if (!notice.memtable().isPresent())
+            if (notice.memtable().isEmpty())
                 buildIndexesBlocking(Lists.newArrayList(notice.added),
                                      indexes.values()
                                             .stream()
                                             .filter(Index::shouldBuildBlocking)
+                                            .filter(i -> 
!i.isSSTableAttached())
                                             .collect(Collectors.toSet()),
                                      false);
         }
diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java 
b/src/java/org/apache/cassandra/index/sai/IndexContext.java
index d0a5e08b16..ac33c837da 100644
--- a/src/java/org/apache/cassandra/index/sai/IndexContext.java
+++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java
@@ -422,7 +422,6 @@ public class IndexContext
                 {
                     if 
(!sstableContext.indexDescriptor.validatePerIndexComponents(this, validation))
                     {
-                        logger.warn(logMessage("Invalid per-column component 
for SSTable {}"), sstableContext.descriptor());
                         invalid.add(sstableContext);
                         continue;
                     }
diff --git a/src/java/org/apache/cassandra/index/sai/SSTableContextManager.java 
b/src/java/org/apache/cassandra/index/sai/SSTableContextManager.java
index 85192f525e..1c6406e0c1 100644
--- a/src/java/org/apache/cassandra/index/sai/SSTableContextManager.java
+++ b/src/java/org/apache/cassandra/index/sai/SSTableContextManager.java
@@ -79,7 +79,6 @@ public class SSTableContextManager
                 // Only validate on restart or newly refreshed SSTable. Newly 
built files are unlikely to be corrupted.
                 if (!sstableContexts.containsKey(sstable) && 
!indexDescriptor.validatePerSSTableComponents(validation))
                 {
-                    logger.warn(indexDescriptor.logMessage("Invalid 
per-SSTable component for SSTable {}"), sstable.descriptor);
                     invalid.add(sstable);
                     removeInvalidSSTableContext(sstable);
                     continue;
diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
index 5a8f7b6a58..e8c4b53f4e 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
@@ -469,6 +469,12 @@ public class StorageAttachedIndex implements Index
         return true;
     }
 
+    @Override
+    public boolean isSSTableAttached()
+    {
+        return true;
+    }
+
     @Override
     public Optional<ColumnFamilyStore> getBackingTable()
     {
diff --git 
a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java
index 76240c6094..c45b604ffc 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java
@@ -99,7 +99,10 @@ public class StorageAttachedIndexBuilder extends 
SecondaryIndexBuilder
     @Override
     public void build()
     {
-        logger.debug(logMessage("Starting full index build"));
+        logger.debug(logMessage(String.format("Starting %s %s index build...",
+                                              isInitialBuild ? "initial" : 
"non-initial",
+                                              isFullRebuild ? "full" : 
"partial")));
+
         for (Map.Entry<SSTableReader, Set<StorageAttachedIndex>> e : 
sstables.entrySet())
         {
             SSTableReader sstable = e.getKey();
@@ -113,9 +116,7 @@ public class StorageAttachedIndexBuilder extends 
SecondaryIndexBuilder
             }
 
             if (indexSSTable(sstable, existing))
-            {
                 return;
-            }
         }
     }
 
@@ -205,7 +206,6 @@ public class StorageAttachedIndexBuilder extends 
SecondaryIndexBuilder
 
             if (t instanceof InterruptedException)
             {
-                // TODO: Is there anything that makes more sense than just 
restoring the interrupt?
                 logger.warn(logMessage("Interrupted while building indexes {} 
on SSTable {}"), indexes, sstable.descriptor);
                 Thread.currentThread().interrupt();
                 return true;
diff --git 
a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
index 7dec78c4da..fb59244a78 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
@@ -249,9 +249,9 @@ public class StorageAttachedIndexGroup implements 
Index.Group, INotificationCons
         {
             SSTableAddedNotification notice = (SSTableAddedNotification) 
notification;
 
-            // Avoid validation for index files just written following 
Memtable flush.
-            IndexValidation validate = notice.memtable().isPresent() ? 
IndexValidation.NONE : IndexValidation.CHECKSUM;
-            onSSTableChanged(Collections.emptySet(), notice.added, indexes, 
validate);
+            // Avoid validation for index files just written following 
Memtable flush. Otherwise, the new SSTables have
+            // come either from import, streaming, or a standalone tool, where 
they have also already been validated.
+            onSSTableChanged(Collections.emptySet(), notice.added, indexes, 
IndexValidation.NONE);
         }
         else if (notification instanceof SSTableListChangedNotification)
         {
@@ -335,6 +335,42 @@ public class StorageAttachedIndexGroup implements 
Index.Group, INotificationCons
         return incomplete;
     }
 
+    @Override
+    public boolean validateSSTableAttachedIndexes(Collection<SSTableReader> 
sstables, boolean throwOnIncomplete)
+    {
+        boolean complete = true;
+
+        for (SSTableReader sstable : sstables)
+        {
+            IndexDescriptor indexDescriptor = IndexDescriptor.create(sstable);
+
+            if (indexDescriptor.isPerSSTableIndexBuildComplete())
+            {
+                indexDescriptor.checksumPerSSTableComponents();
+
+                for (StorageAttachedIndex index : indexes)
+                {
+                    if 
(indexDescriptor.isPerColumnIndexBuildComplete(index.getIndexContext()))
+                        
indexDescriptor.checksumPerIndexComponents(index.getIndexContext());
+                    else if (throwOnIncomplete)
+                        throw new 
IllegalStateException(indexDescriptor.logMessage("Incomplete per-column index 
build"));
+                    else
+                        complete = false;
+                }
+            }
+            else if (throwOnIncomplete)
+            {
+                throw new 
IllegalStateException(indexDescriptor.logMessage("Incomplete per-SSTable index 
build"));
+            }
+            else
+            {
+                complete = false;
+            }
+        }
+
+        return complete;    
+    }
+
     /**
      * open index files by checking number of {@link SSTableContext} and 
{@link SSTableIndex},
      * so transient open files during validation and files that are still open 
for in-flight requests will not be tracked.
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
index f315e0dbba..a1340c3507 100644
--- 
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
+++ 
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
@@ -48,13 +48,10 @@ public class StorageAttachedIndexWriter implements 
SSTableFlushObserver
     private static final Logger logger = 
LoggerFactory.getLogger(StorageAttachedIndexWriter.class);
 
     private final IndexDescriptor indexDescriptor;
-    private final Collection<StorageAttachedIndex> indexes;
     private final Collection<PerColumnIndexWriter> perIndexWriters;
     private final PerSSTableIndexWriter perSSTableWriter;
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
     private final RowMapping rowMapping;
-    private final boolean propagateErrors;
-
     private DecoratedKey currentKey;
     private boolean tokenOffsetWriterCompleted = false;
     private boolean aborted = false;
@@ -65,7 +62,7 @@ public class StorageAttachedIndexWriter implements 
SSTableFlushObserver
                                                                        
Collection<StorageAttachedIndex> indexes,
                                                                        
LifecycleNewTracker lifecycleNewTracker) throws IOException
     {
-        return new StorageAttachedIndexWriter(indexDescriptor, indexes, 
lifecycleNewTracker, false, false);
+        return new StorageAttachedIndexWriter(indexDescriptor, indexes, 
lifecycleNewTracker, false);
 
     }
 
@@ -74,19 +71,16 @@ public class StorageAttachedIndexWriter implements 
SSTableFlushObserver
                                                                  
LifecycleNewTracker lifecycleNewTracker,
                                                                  boolean 
perIndexComponentsOnly) throws IOException
     {
-        return new StorageAttachedIndexWriter(indexDescriptor, indexes, 
lifecycleNewTracker, perIndexComponentsOnly, true);
+        return new StorageAttachedIndexWriter(indexDescriptor, indexes, 
lifecycleNewTracker, perIndexComponentsOnly);
     }
 
     private StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
                                        Collection<StorageAttachedIndex> 
indexes,
                                        LifecycleNewTracker lifecycleNewTracker,
-                                       boolean perIndexComponentsOnly,
-                                       boolean propagateErrors) throws 
IOException
+                                       boolean perIndexComponentsOnly) throws 
IOException
     {
         this.indexDescriptor = indexDescriptor;
-        this.indexes = indexes;
         this.rowMapping = RowMapping.create(lifecycleNewTracker.opType());
-        this.propagateErrors = propagateErrors;
         this.perIndexWriters = indexes.stream().map(index -> 
indexDescriptor.newPerColumnIndexWriter(index,
                                                                                
                      lifecycleNewTracker,
                                                                                
                      rowMapping))
@@ -209,13 +203,11 @@ public class StorageAttachedIndexWriter implements 
SSTableFlushObserver
      */
     public void abort(Throwable accumulator, boolean fromIndex)
     {
+        if (aborted) return;
+
         // Mark the write operation aborted, so we can short-circuit any 
further operations on the component writers.
         aborted = true;
         
-        // Make any indexes involved in this transaction non-queryable, as 
they will likely not match the backing table.
-        if (fromIndex)
-            indexes.forEach(StorageAttachedIndex::makeIndexNonQueryable);
-        
         for (PerColumnIndexWriter perIndexWriter : perIndexWriters)
         {
             try
@@ -237,9 +229,9 @@ public class StorageAttachedIndexWriter implements 
SSTableFlushObserver
             perSSTableWriter.abort();
         }
 
-        // If the abort was from an index error and this is part of an index 
build operation then
-        // propagate the error up to the SecondaryIndexManager, so it can be 
correctly marked as failed.
-        if (fromIndex && propagateErrors)
+        // If the abort was from an index error, propagate the error upstream 
so index builds, compactions, and 
+        // flushes can handle it correctly.
+        if (fromIndex)
             throw Throwables.unchecked(accumulator);
     }
 
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java 
b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
index 5aa2a985d5..88680d9b09 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.index.sai.disk.format;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -54,12 +55,12 @@ import org.apache.lucene.util.IOUtils;
 /**
  * The {@link IndexDescriptor} is an analog of the SSTable {@link Descriptor} 
and provides version
  * specific information about the on-disk state of a {@link 
StorageAttachedIndex}.
- *
+ * <p>
  * The {@link IndexDescriptor} is primarily responsible for maintaining a view 
of the on-disk state
  * of an index for a specific {@link org.apache.cassandra.io.sstable.SSTable}.
- *
+ * <p>
  * It is responsible for opening files for use by writers and readers.
- *
+ * <p>
  * Its remaining responsibility is to act as a proxy to the {@link 
OnDiskFormat} associated with the
  * index {@link Version}.
  */
@@ -170,13 +171,11 @@ public class IndexDescriptor
         return isPerColumnIndexBuildComplete(indexContext) && 
numberOfPerIndexComponents(indexContext) == 1;
     }
 
-    @SuppressWarnings("UnstableApiUsage")
     public void createComponentOnDisk(IndexComponent component) throws 
IOException
     {
         Files.touch(fileFor(component).toJavaIOFile());
     }
 
-    @SuppressWarnings("UnstableApiUsage")
     public void createComponentOnDisk(IndexComponent component, IndexContext 
indexContext) throws IOException
     {
         Files.touch(fileFor(component, indexContext).toJavaIOFile());
@@ -335,7 +334,16 @@ public class IndexDescriptor
 
         logger.info(indexContext.logMessage("Validating per-column index 
components using mode " + validation));
         boolean checksum = validation == IndexValidation.CHECKSUM;
-        return version.onDiskFormat().validatePerColumnIndexComponents(this, 
indexContext, checksum);
+
+        try
+        {
+            version.onDiskFormat().validatePerColumnIndexComponents(this, 
indexContext, checksum);
+            return true;
+        }
+        catch (UncheckedIOException e)
+        {
+            return false;
+        }
     }
 
     @SuppressWarnings("BooleanMethodIsAlwaysInverted")
@@ -346,7 +354,27 @@ public class IndexDescriptor
 
         logger.info(logMessage("Validating per-sstable index components using 
mode " + validation));
         boolean checksum = validation == IndexValidation.CHECKSUM;
-        return version.onDiskFormat().validatePerSSTableIndexComponents(this, 
checksum);
+
+        try
+        {
+            version.onDiskFormat().validatePerSSTableIndexComponents(this, 
checksum);
+            return true;
+        }
+        catch (UncheckedIOException e)
+        {
+            return false;
+        }
+    }
+
+    public void checksumPerIndexComponents(IndexContext indexContext)
+    {
+        version.onDiskFormat().validatePerColumnIndexComponents(this, 
indexContext, true);
+    }
+
+    @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+    public void checksumPerSSTableComponents()
+    {
+        version.onDiskFormat().validatePerSSTableIndexComponents(this, true);
     }
 
     public void deletePerSSTableIndexComponents()
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java 
b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java
index cf19dceb6b..32551b3a16 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.index.sai.disk.format;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Set;
 
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
@@ -35,7 +36,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 /**
  * An interface to the on-disk format of an index. This provides format 
agnostic methods
  * to read and write an on-disk format.
- *
+ * <p>
  * The methods on this interface can be logically mapped into the following 
groups
  * based on their method parameters:
  * <ul>
@@ -116,9 +117,9 @@ public interface OnDiskFormat
      * @param indexDescriptor The {@link IndexDescriptor} for the SSTable SAI 
index
      * @param checksum {@code true} if the checksum should be tested as part 
of the validation
      *
-     * @return true if all the per-SSTable components are valid
+     * @throws UncheckedIOException if there is a problem validating any 
on-disk component
      */
-    boolean validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor, 
boolean checksum);
+    void validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor, 
boolean checksum);
 
     /**
      * Validate all the per-column on-disk components and throw if a component 
is not valid
@@ -127,9 +128,9 @@ public interface OnDiskFormat
      * @param indexContext The {@link IndexContext} holding the per-index 
information for the index
      * @param checksum {@code true} if the checksum should be tested as part 
of the validation
      *
-     * @return true if all the per-column components are valid
+     * @throws UncheckedIOException if there is a problem validating any 
on-disk component
      */
-    boolean validatePerColumnIndexComponents(IndexDescriptor indexDescriptor, 
IndexContext indexContext, boolean checksum);
+    void validatePerColumnIndexComponents(IndexDescriptor indexDescriptor, 
IndexContext indexContext, boolean checksum);
 
     /**
      * Returns the set of {@link IndexComponent} for the per-SSTable part of 
an index.
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
index 763e1af102..86dcd24675 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
@@ -19,10 +19,12 @@
 package org.apache.cassandra.index.sai.disk.v1;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.EnumSet;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,13 +77,13 @@ public class V1OnDiskFormat implements OnDiskFormat
 
     /**
      * Global limit on heap consumed by all index segment building that occurs 
outside the context of Memtable flush.
-     *
+     * <p>
      * Note that to avoid flushing small index segments, a segment is only 
flushed when
      * both the global size of all building segments has breached the limit 
and the size of the
      * segment in question reaches (segment_write_buffer_space_mb / # 
currently building column indexes).
-     *
+     * <p>
      * ex. If there is only one column index building, it can buffer up to 
segment_write_buffer_space_mb.
-     *
+     * <p>
      * ex. If there is one column index building per table across 8 
compactors, each index will be
      *     eligible to flush once it reaches (segment_write_buffer_space_mb / 
8) MBs.
      */
@@ -161,7 +163,7 @@ public class V1OnDiskFormat implements OnDiskFormat
     }
 
     @Override
-    public boolean validatePerSSTableIndexComponents(IndexDescriptor 
indexDescriptor, boolean checksum)
+    public void validatePerSSTableIndexComponents(IndexDescriptor 
indexDescriptor, boolean checksum)
     {
         for (IndexComponent indexComponent : perSSTableIndexComponents())
         {
@@ -174,22 +176,20 @@ public class V1OnDiskFormat implements OnDiskFormat
                     else
                         SAICodecUtils.validate(input);
                 }
-                catch (Throwable e)
+                catch (Exception e)
                 {
-                    logger.error(indexDescriptor.logMessage("{} failed for 
index component {} on SSTable {}. Error: {}"),
-                                 checksum ? "Checksum validation" : 
"Validation",
-                                 indexComponent,
-                                 indexDescriptor.sstableDescriptor,
-                                 e);
-                    return false;
+                    logger.warn(indexDescriptor.logMessage("{} failed for 
index component {} on SSTable {}."),
+                                                           checksum ? 
"Checksum validation" : "Validation",
+                                                           indexComponent,
+                                                           
indexDescriptor.sstableDescriptor);
+                    rethrowIOException(e);
                 }
             }
         }
-        return true;
     }
 
     @Override
-    public boolean validatePerColumnIndexComponents(IndexDescriptor 
indexDescriptor, IndexContext indexContext, boolean checksum)
+    public void validatePerColumnIndexComponents(IndexDescriptor 
indexDescriptor, IndexContext indexContext, boolean checksum)
     {
         for (IndexComponent indexComponent : 
perColumnIndexComponents(indexContext))
         {
@@ -202,20 +202,25 @@ public class V1OnDiskFormat implements OnDiskFormat
                     else
                         SAICodecUtils.validate(input);
                 }
-                catch (Throwable e)
+                catch (Exception e)
                 {
-                    if (logger.isDebugEnabled())
-                    {
-                        logger.debug(indexDescriptor.logMessage("{} failed for 
index component {} on SSTable {}"),
-                                     (checksum ? "Checksum validation" : 
"Validation"),
-                                     indexComponent,
-                                     indexDescriptor.sstableDescriptor);
-                    }
-                    return false;
+                    logger.warn(indexDescriptor.logMessage("{} failed for 
index component {} on SSTable {}"),
+                                                           checksum ? 
"Checksum validation" : "Validation",
+                                                           indexComponent,
+                                                           
indexDescriptor.sstableDescriptor);
+                    rethrowIOException(e);
                 }
             }
         }
-        return true;
+    }
+
+    private static void rethrowIOException(Exception e)
+    {
+        if (e instanceof IOException)
+            throw new UncheckedIOException((IOException) e);
+        if (e.getCause() instanceof IOException)
+            throw new UncheckedIOException((IOException) e.getCause());
+        throw Throwables.unchecked(e);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java 
b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 4668844101..1ae3a04943 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -222,6 +222,7 @@ public class SASIIndex implements Index, 
INotificationConsumer
         };
     }
 
+    @Override
     public boolean shouldBuildBlocking()
     {
         return true;
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index c9cc2b1c2d..05140632f9 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -207,8 +207,8 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
     public SSTableReader finish(boolean openResult)
     {
         this.setOpenResult(openResult);
-        txnProxy.finish();
         observers.forEach(SSTableFlushObserver::complete);
+        txnProxy.finish();
         return finished();
     }
 
@@ -232,12 +232,15 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
     {
         try
         {
-            return txnProxy.commit(accumulate);
+            observers.forEach(SSTableFlushObserver::complete);
         }
-        finally
+        catch (Throwable t)
         {
-            observers.forEach(SSTableFlushObserver::complete);
+            // Return without advancing to COMMITTED, which will trigger 
abort() when the Transactional closes...
+            return Throwables.merge(accumulate, t);
         }
+
+        return txnProxy.commit(accumulate);
     }
 
     public final Throwable abort(Throwable accumulate)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/ImportIndexedSSTablesTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/ImportIndexedSSTablesTest.java
new file mode 100644
index 0000000000..6ce6bb579e
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/ImportIndexedSSTablesTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.index.sai.StorageAttachedIndexBuilder;
+import org.apache.cassandra.index.sai.disk.v1.SAICodecUtils;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.IndexInput;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class ImportIndexedSSTablesTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void startup() throws IOException
+    {
+        cluster = init(Cluster.build(2).withConfig(c -> c.with(NETWORK, 
GOSSIP))
+                                       
.withInstanceInitializer(ByteBuddyHelper::installErrors)
+                                       .start());
+
+        cluster.disableAutoCompaction(KEYSPACE);
+    }
+
+    @AfterClass
+    public static void shutdown()
+    {
+        if (cluster != null)
+            cluster.close();
+    }
+
+    @Test
+    public void testIndexBuildingFailureDuringImport()
+    {
+        String table = "fail_during_import_test";
+        cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int PRIMARY 
KEY, v text)", KEYSPACE, table));
+
+        IInvokableInstance first = cluster.get(1);
+        first.runOnInstance(()-> ByteBuddyHelper.failValidation = false);
+        first.runOnInstance(()-> ByteBuddyHelper.interruptBuild = true);
+
+        first.executeInternal(String.format("INSERT INTO %s.%s(pk, v) VALUES 
(?, ?)", KEYSPACE, table), 1, "v1");
+        first.flush(KEYSPACE);
+
+        Object[][] rs = first.executeInternal(String.format("SELECT pk FROM 
%s.%s WHERE pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(1);
+
+        first.runOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(table).clearUnsafe());
+
+        String indexName = table + "_v_index";
+        cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s(v) USING 
'sai'", indexName, KEYSPACE, table));
+        SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, indexName);
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(0);
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v 
= ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(0);
+
+        first.runOnInstance(() ->
+                assertThatThrownBy(() ->
+                        ColumnFamilyStore.loadNewSSTables(KEYSPACE, 
table)).hasRootCauseExactlyInstanceOf(CompactionInterruptedException.class));
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(0);
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v 
= ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(0);
+    }
+
+    @Test
+    public void testImportBuildsSSTableIndexes()
+    {
+        String table = "import_build_test";
+        cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int PRIMARY 
KEY, v text)", KEYSPACE, table));
+
+        IInvokableInstance first = cluster.get(1);
+        first.runOnInstance(()-> ByteBuddyHelper.failValidation = false);
+        first.runOnInstance(()-> ByteBuddyHelper.interruptBuild = false);
+
+        first.executeInternal(String.format("INSERT INTO %s.%s(pk, v) VALUES 
(?, ?)", KEYSPACE, table), 1, "v1");
+        first.flush(KEYSPACE);
+
+        Object[][] rs = first.executeInternal(String.format("SELECT pk FROM 
%s.%s WHERE pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(1);
+
+        first.runOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(table).clearUnsafe());
+
+        String indexName = table + "_v_index";
+        cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s(v) USING 
'sai'", indexName, KEYSPACE, table));
+        SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, indexName);
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(0);
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v 
= ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(0);
+
+        first.runOnInstance(() -> ColumnFamilyStore.loadNewSSTables(KEYSPACE, 
table));
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(1);
+        assertThat(rs[0][0]).isEqualTo(1);
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v 
= ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(1);
+        assertThat(rs[0][0]).isEqualTo(1);
+    }
+
+    @Test
+    public void testValidationFailureDuringImport()
+    {
+        String table = "validation_failure_test";
+        cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int PRIMARY 
KEY, v text)", KEYSPACE, table));
+
+        IInvokableInstance first = cluster.get(1);
+        first.runOnInstance(()-> ByteBuddyHelper.failValidation = true);
+        first.runOnInstance(()-> ByteBuddyHelper.interruptBuild = false);
+
+        first.executeInternal(String.format("INSERT INTO %s.%s(pk, v) VALUES 
(?, ?)", KEYSPACE, table), 1, "v1");
+        first.flush(KEYSPACE);
+
+        Object[][] rs = first.executeInternal(String.format("SELECT pk FROM 
%s.%s WHERE pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(1);
+
+        String indexName = table + "_v_index";
+        cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s(v) USING 
'sai'", indexName, KEYSPACE, table));
+        SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, indexName);
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v 
= ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(1);
+        assertThat(rs[0][0]).isEqualTo(1);
+
+        first.runOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(table).clearUnsafe());
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(0);
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v 
= ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(0);
+
+        first.runOnInstance(() ->
+                assertThatThrownBy(() ->
+                        ColumnFamilyStore.loadNewSSTables(KEYSPACE, 
table)).hasRootCauseExactlyInstanceOf(CorruptIndexException.class));
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(0);
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v 
= ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(0);
+    }
+
+    @Test
+    public void testImportIncludesExistingSSTableIndexes()
+    {
+        String table = "existing_indexes_test";
+        cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int PRIMARY 
KEY, v text)", KEYSPACE, table));
+
+        IInvokableInstance first = cluster.get(1);
+        first.runOnInstance(()-> ByteBuddyHelper.failValidation = false);
+        first.runOnInstance(()-> ByteBuddyHelper.interruptBuild = false);
+
+        first.executeInternal(String.format("INSERT INTO %s.%s(pk, v) VALUES 
(?, ?)", KEYSPACE, table), 1, "v1");
+        first.flush(KEYSPACE);
+
+        Object[][] rs = first.executeInternal(String.format("SELECT pk FROM 
%s.%s WHERE pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(1);
+
+        String indexName = table + "_v_index";
+        cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s(v) USING 
'sai'", indexName, KEYSPACE, table));
+        SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, indexName);
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v 
= ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(1);
+        assertThat(rs[0][0]).isEqualTo(1);
+
+        first.runOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(table).clearUnsafe());
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(0);
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v 
= ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(0);
+
+        first.runOnInstance(() -> ColumnFamilyStore.loadNewSSTables(KEYSPACE, 
table));
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(1);
+        assertThat(rs[0][0]).isEqualTo(1);
+
+        rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v 
= ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(1);
+        assertThat(rs[0][0]).isEqualTo(1);
+    }
+
+    public static class ByteBuddyHelper
+    {
+        static volatile boolean interruptBuild = false;
+        static volatile boolean failValidation = false;
+
+        @SuppressWarnings("resource")
+        static void installErrors(ClassLoader loader, int node)
+        {
+            new ByteBuddy().rebase(StorageAttachedIndexBuilder.class)
+                           .method(named("isStopRequested"))
+                           
.intercept(MethodDelegation.to(ByteBuddyHelper.class))
+                           .make()
+                           .load(loader, 
ClassLoadingStrategy.Default.INJECTION);
+
+            new ByteBuddy().rebase(SAICodecUtils.class)
+                           .method(named("validateChecksum"))
+                           
.intercept(MethodDelegation.to(ByteBuddyHelper.class))
+                           .make()
+                           .load(loader, 
ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        @SuppressWarnings("unused")
+        public static boolean isStopRequested(@SuperCall Callable<Boolean> 
zuper)
+        {
+            if (interruptBuild)
+                return true;
+
+            try
+            {
+                return zuper.call();
+            }
+            catch (Exception e)
+            {
+                throw Throwables.unchecked(e);
+            }
+        }
+
+        @SuppressWarnings("unused")
+        public static void validateChecksum(IndexInput input, @SuperCall 
Callable<Void> zuper) throws IOException
+        {
+            if (failValidation)
+                throw new CorruptIndexException("Injected failure!", "Test 
resource");
+
+            try
+            {
+                zuper.call();
+            }
+            catch (Exception e)
+            {
+                throw Throwables.unchecked(e);
+            }
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingFailureTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingFailureTest.java
new file mode 100644
index 0000000000..c3ff4a528f
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingFailureTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.index.sai.IndexContext;
+import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
+import org.apache.cassandra.index.sai.disk.v1.SAICodecUtils;
+import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder;
+import org.apache.cassandra.index.sai.disk.v1.segment.SegmentMetadata;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.IndexInput;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class IndexStreamingFailureTest extends TestBaseImpl
+{
+    public static final String TEST_ERROR_MESSAGE = "Injected failure!";
+    
+    private static Cluster cluster;
+    
+    @BeforeClass
+    public static void startup() throws IOException
+    {
+        cluster = init(Cluster.build(2).withConfig(c -> c.with(NETWORK, 
GOSSIP))
+                                       
.withInstanceInitializer(ByteBuddyHelper::installErrors)
+                                       .start());
+
+        cluster.disableAutoCompaction(KEYSPACE);
+    }
+
+    @AfterClass
+    public static void shutdown()
+    {
+        if (cluster != null)
+            cluster.close();
+    }
+
+    @Test
+    public void testAvailabilityAfterFailedNonEntireFileStreaming() throws 
Exception
+    {
+        cluster.get(2).runOnInstance(()-> ByteBuddyHelper.failFlush = true);
+        cluster.get(2).runOnInstance(()-> ByteBuddyHelper.failValidation = 
false);
+        testAvailabilityAfterStreaming("non_entire_file_test", false);
+    }
+
+    @Test
+    public void testAvailabilityAfterFailedEntireFileStreaming() throws 
Exception
+    {
+        cluster.get(2).runOnInstance(()-> ByteBuddyHelper.failFlush = false);
+        cluster.get(2).runOnInstance(()-> ByteBuddyHelper.failValidation = 
true);
+        testAvailabilityAfterStreaming("entire_file_test", true);
+    }
+
+    private void testAvailabilityAfterStreaming(String table, boolean 
streamEntireSSTables) throws Exception
+    {
+        String indexName = table + "_v_index";
+        cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int PRIMARY 
KEY, v text)", KEYSPACE, table));
+        cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s(v) USING 
'sai'", indexName, KEYSPACE, table));
+        SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, indexName);
+
+        IInvokableInstance first = cluster.get(1);
+        IInvokableInstance second = cluster.get(2);
+        first.runOnInstance(()-> 
DatabaseDescriptor.setStreamEntireSSTables(streamEntireSSTables));
+        second.runOnInstance(()-> 
DatabaseDescriptor.setStreamEntireSSTables(streamEntireSSTables));
+
+        first.executeInternal(String.format("INSERT INTO %s.%s(pk, v) VALUES 
(?, ?)", KEYSPACE, table), 1, "v1");
+        first.flush(KEYSPACE);
+
+        Object[][] rs = second.executeInternal(String.format("SELECT pk FROM 
%s.%s WHERE v = ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(0);
+
+        // The repair job should fail when index completion fails. This should 
also fail the streaming transaction.
+        long mark = second.logs().mark();
+        second.nodetoolResult("repair", KEYSPACE).asserts().failure();
+        assertFalse("There should be an injected failure in the logs.", 
second.logs().grep(mark, TEST_ERROR_MESSAGE).getResult().isEmpty());
+
+        // The SSTable should not be added to the table view, as the streaming 
transaction failed...
+        rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(0);
+
+        // ...and querying the index also returns nothing, as the index for 
the streamed SSTable was never built.
+        rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
v = ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(0);
+
+        // On restart, ensure that the index remains querable and does not 
include the data we attempted to stream. 
+        second.shutdown().get();
+        second.startup();
+
+        // On restart, the base table should be unchanged...
+        rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
pk = ?", KEYSPACE, table), 1);
+        assertThat(rs.length).isEqualTo(0);
+
+        // ...and the index should remain queryable, because from its 
perspective, the streaming never happened.
+        rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
v = ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(0);
+
+        // Disable failure injection, and verify that the index is queryable 
and has the newly streamed data:
+        second.runOnInstance(()-> ByteBuddyHelper.failFlush = false);
+        second.runOnInstance(()-> ByteBuddyHelper.failValidation = false);
+        second.nodetoolResult("repair", KEYSPACE).asserts().success();
+
+        rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE 
v = ?", KEYSPACE, table), "v1");
+        assertThat(rs.length).isEqualTo(1);
+    }
+
+    public static class ByteBuddyHelper
+    {
+        volatile static boolean failFlush = false;
+        volatile static boolean failValidation = false;
+        
+        @SuppressWarnings("resource")
+        static void installErrors(ClassLoader loader, int node)
+        {
+            if (node == 2)
+            {
+                new ByteBuddy().rebase(SegmentBuilder.class)
+                               .method(named("flush"))
+                               
.intercept(MethodDelegation.to(ByteBuddyHelper.class))
+                               .make()
+                               .load(loader, 
ClassLoadingStrategy.Default.INJECTION);
+
+                new ByteBuddy().rebase(SAICodecUtils.class)
+                               .method(named("validateChecksum"))
+                               
.intercept(MethodDelegation.to(ByteBuddyHelper.class))
+                               .make()
+                               .load(loader, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        @SuppressWarnings("unused")
+        public static SegmentMetadata flush(IndexDescriptor indexDescriptor, 
IndexContext indexContext, @SuperCall Callable<SegmentMetadata> zuper) throws 
IOException
+        {
+            if (failFlush)
+                throw new IOException(TEST_ERROR_MESSAGE);
+
+            try
+            {
+                return zuper.call();
+            }
+            catch (Exception e)
+            {
+                throw Throwables.unchecked(e);
+            }
+        }
+
+        @SuppressWarnings("unused")
+        public static void validateChecksum(IndexInput input, @SuperCall 
Callable<Void> zuper) throws IOException
+        {
+            if (failValidation)
+                throw new CorruptIndexException(TEST_ERROR_MESSAGE, "Test 
resource");
+
+            try
+            {
+                zuper.call();
+            }
+            catch (Exception e)
+            {
+                throw Throwables.unchecked(e);
+            }
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/index/sai/SAITester.java 
b/test/unit/org/apache/cassandra/index/sai/SAITester.java
index 4f147dc4b8..35c4c6c5d6 100644
--- a/test/unit/org/apache/cassandra/index/sai/SAITester.java
+++ b/test/unit/org/apache/cassandra/index/sai/SAITester.java
@@ -693,7 +693,7 @@ public abstract class SAITester extends CQLTester
         }
     }
 
-    protected void assertNumRows(int expected, String query, Object... args) 
throws Throwable
+    protected void assertNumRows(int expected, String query, Object... args)
     {
         ResultSet rs = executeNet(String.format(query, args));
         assertEquals(expected, rs.all().size());
diff --git 
a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java 
b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java
index 43635c2e7a..58d530feff 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java
@@ -133,7 +133,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void shouldFailUnsupportedType() throws Throwable
+    public void shouldFailUnsupportedType()
     {
         for (CQL3Type.Native cql3Type : CQL3Type.Native.values())
         {
@@ -229,7 +229,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void shouldNotFailCreateWithTupleType() throws Throwable
+    public void shouldNotFailCreateWithTupleType()
     {
         createTable("CREATE TABLE %s (id text PRIMARY KEY, val tuple<text, 
int, double>)");
 
@@ -485,7 +485,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void shouldFailCreationMultipleIndexesOnSimpleColumn() throws 
Throwable
+    public void shouldFailCreationMultipleIndexesOnSimpleColumn()
     {
         createTable("CREATE TABLE %s (id int PRIMARY KEY, v1 TEXT)");
         execute("INSERT INTO %s (id, v1) VALUES(1, '1')");
@@ -514,7 +514,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void shouldIndexBuildingWithInMemoryData() throws Throwable
+    public void shouldIndexBuildingWithInMemoryData()
     {
         createTable(CREATE_TABLE_TEMPLATE);
 
@@ -644,7 +644,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void shouldCreateIndexFilesAfterMultipleConcurrentIndexCreation() 
throws Throwable
+    public void shouldCreateIndexFilesAfterMultipleConcurrentIndexCreation()
     {
         createTable(CREATE_TABLE_TEMPLATE);
         verifyNoIndexFiles();
@@ -671,7 +671,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void shouldCreateIndexFilesAfterMultipleSequentialIndexCreation() 
throws Throwable
+    public void shouldCreateIndexFilesAfterMultipleSequentialIndexCreation()
     {
         createTable(CREATE_TABLE_TEMPLATE);
         verifyNoIndexFiles();
@@ -703,7 +703,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void shouldReleaseIndexFilesAfterCompaction() throws Throwable
+    public void shouldReleaseIndexFilesAfterCompaction()
     {
         createTable(CREATE_TABLE_TEMPLATE);
         disableCompaction(KEYSPACE);
@@ -741,18 +741,18 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void truncateWithBuiltIndexes() throws Throwable
+    public void truncateWithBuiltIndexes()
     {
         verifyTruncateWithIndex(false);
     }
 
     @Test
-    public void concurrentTruncateWithIndexBuilding() throws Throwable
+    public void concurrentTruncateWithIndexBuilding()
     {
         verifyTruncateWithIndex(true);
     }
 
-    private void verifyTruncateWithIndex(boolean concurrentTruncate) throws 
Throwable
+    private void verifyTruncateWithIndex(boolean concurrentTruncate)
     {
         createTable(CREATE_TABLE_TEMPLATE);
 
@@ -881,8 +881,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
         boolean expectedLiteralState = !failedStringIndex || 
isBuildCompletionMarker(component);
 
         assertEquals("Checksum verification for " + component + " should be " 
+ expectedNumericState + " but was " + !expectedNumericState,
-                     expectedNumericState,
-                     verifyChecksum(numericIndexContext));
+                     expectedNumericState, 
verifyChecksum(numericIndexContext));
         assertEquals(expectedLiteralState, verifyChecksum(stringIndexContext));
 
         if (rebuild)
@@ -1006,7 +1005,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void verifyFlushAndCompactEmptyIndex() throws Throwable
+    public void verifyFlushAndCompactEmptyIndex()
     {
         createTable(CREATE_TABLE_TEMPLATE);
         disableCompaction(KEYSPACE);
@@ -1042,7 +1041,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void verifyFlushAndCompactNonIndexableRows() throws Throwable
+    public void verifyFlushAndCompactNonIndexableRows()
     {
         // valid row ids, but no valid indexable content
         Runnable populateData = () -> {
@@ -1065,7 +1064,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
     }
 
     @Test
-    public void verifyFlushAndCompactTombstones() throws Throwable
+    public void verifyFlushAndCompactTombstones()
     {
         // no valid row ids
         Runnable populateData = () -> {
@@ -1086,7 +1085,7 @@ public class StorageAttachedIndexDDLTest extends SAITester
         verifyFlushAndCompactEmptyIndexes(populateData);
     }
 
-    private void verifyFlushAndCompactEmptyIndexes(Runnable populateData) 
throws Throwable
+    private void verifyFlushAndCompactEmptyIndexes(Runnable populateData)
     {
         createTable(CREATE_TABLE_TEMPLATE);
         disableCompaction(KEYSPACE);
diff --git 
a/test/unit/org/apache/cassandra/index/sai/metrics/SegmentFlushingFailureTester.java
 
b/test/unit/org/apache/cassandra/index/sai/metrics/SegmentFlushingFailureTester.java
index 378bbd4391..4b98a94671 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/metrics/SegmentFlushingFailureTester.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/metrics/SegmentFlushingFailureTester.java
@@ -18,15 +18,19 @@
 package org.apache.cassandra.index.sai.metrics;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+import org.apache.cassandra.utils.FBUtilities;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.exceptions.ReadFailureException;
 import org.apache.cassandra.config.StorageAttachedIndexOptions;
 import org.apache.cassandra.index.sai.SAITester;
 import org.apache.cassandra.index.sai.disk.v1.SSTableComponentsWriter;
@@ -90,7 +94,7 @@ public abstract class SegmentFlushingFailureTester extends 
SAITester
     protected abstract long expectedBytesLimit();
 
     @Test
-    public void testSegmentMemoryTrackerLifecycle() throws Throwable
+    public void testSegmentMemoryTrackerLifecycle()
     {
         createTable(CREATE_TABLE_TEMPLATE);
         createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1"));
@@ -157,9 +161,10 @@ public abstract class SegmentFlushingFailureTester extends 
SAITester
         // Verify that we abort exactly once and zero the memory tracker:
         verifyCompactionIndexBuilds(1, failure, currentTable());
 
+        // We should still be able to query the index if compaction is aborted:
         String select = String.format("SELECT * FROM %%s WHERE %s = %s", 
column, column.equals("v1") ? "0" : "'0'");
-
-        assertThatThrownBy(() -> 
executeNet(select)).isInstanceOf(ReadFailureException.class);
+        ResultSet rows = executeNet(select);
+        assertEquals(1, rows.all().size());
     }
 
     @Test
@@ -187,11 +192,11 @@ public abstract class SegmentFlushingFailureTester 
extends SAITester
         // Start compaction against both tables/indexes and verify that they 
are aborted safely:
         verifyCompactionIndexBuilds(2, segmentFlushFailure, table1, table2);
 
-        assertThatThrownBy(() -> executeNet(String.format("SELECT * FROM %s 
WHERE v1 = 0", KEYSPACE + "." + table1)))
-        .isInstanceOf(ReadFailureException.class);
-
-        assertThatThrownBy(() -> executeNet(String.format("SELECT * FROM %s 
WHERE v1 = 0", KEYSPACE + "." + table2)))
-        .isInstanceOf(ReadFailureException.class);
+        // We should still be able to query the indexes if compaction is 
aborted:
+        ResultSet rows = executeNet(String.format("SELECT * FROM %s WHERE v1 = 
0", KEYSPACE + "." + table1));
+        assertEquals(1, rows.all().size());
+        rows = executeNet(String.format("SELECT * FROM %s WHERE v1 = 0", 
KEYSPACE + "." + table2));
+        assertEquals(1, rows.all().size());
     }
 
     private void verifyCompactionIndexBuilds(int aborts, Injection failure, 
String... tables) throws Throwable
@@ -201,7 +206,14 @@ public abstract class SegmentFlushingFailureTester extends 
SAITester
 
         try
         {
-            Arrays.stream(tables).forEach(table -> compact(KEYSPACE, table));
+            ExecutorService executor = 
Executors.newFixedThreadPool(tables.length);
+            List<Future<?>> results = new ArrayList<>();
+
+            for (String table : tables)
+                results.add(executor.submit(() -> compact(KEYSPACE, table)));
+            
+            assertThatThrownBy(() -> 
FBUtilities.waitOnFutures(results)).hasRootCauseMessage("Injected failure!");
+            executor.shutdownNow();
 
             Assert.assertEquals(aborts, writerAbortCounter.get());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to