This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 98fefc1557efa7123bab19eb5f74f04964aa02c1
Author: Piotr Kołaczkowski <pkola...@datastax.com>
AuthorDate: Thu Jun 15 15:07:18 2023 +0200

    Stream all compatible components registered by an SSTable
    
    patch by Piotr Kołaczkowski; reviewed by Andrés de la Peña and Caleb 
Rackliffe for CASSANDRA-18345
---
 .../CassandraEntireSSTableStreamReader.java        |   2 +-
 .../db/streaming/CassandraOutgoingFile.java        |   4 +-
 .../cassandra/db/streaming/ComponentContext.java   |   6 +-
 .../cassandra/db/streaming/ComponentManifest.java  |  12 +-
 .../index/sai/disk/format/IndexComponent.java      |  18 +++
 .../cassandra/index/sai/disk/format/Version.java   |   9 +-
 .../index/sai/disk/v1/V1OnDiskFormat.java          |  14 +-
 .../org/apache/cassandra/io/sstable/Component.java |  26 ++--
 .../org/apache/cassandra/io/sstable/SSTable.java   |  11 ++
 .../io/sstable/SSTableZeroCopyWriter.java          |  26 ++--
 .../org/apache/cassandra/io/sstable/SSTable_API.md |  14 +-
 .../cassandra/io/sstable/format/SSTableFormat.java |  19 ++-
 .../cassandra/io/sstable/format/big/BigFormat.java |  20 +--
 .../cassandra/io/sstable/format/bti/BtiFormat.java |  19 +--
 .../distributed/test/sai/IndexStreamingTest.java   | 155 +++++++++++++++++++++
 .../microbench/ZeroCopyStreamingBenchmark.java     |   2 +-
 .../CassandraEntireSSTableStreamWriterTest.java    |   4 +-
 .../db/streaming/CassandraStreamHeaderTest.java    |   4 +-
 .../apache/cassandra/io/sstable/ComponentTest.java |  20 +--
 .../io/sstable/SSTableZeroCopyWriterTest.java      |   2 +-
 ...ntireSSTableStreamingCorrectFilesCountTest.java |   2 +-
 21 files changed, 278 insertions(+), 111 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index 9c5e048a12..98e2b6f7ef 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -121,7 +121,7 @@ public class CassandraEntireSSTableStreamReader implements 
IStreamReader
                              prettyPrintMemory(bytesRead),
                              prettyPrintMemory(totalSize));
 
-                writer.writeComponent(component.type, in, length);
+                writer.writeComponent(component, in, length);
                 
session.progress(writer.descriptor.fileFor(component).toString(), 
ProgressInfo.Direction.IN, length, length, length);
                 bytesRead += length;
 
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 73abbe8846..7572749d37 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -65,7 +65,7 @@ public class CassandraOutgoingFile implements OutgoingStream
 
         this.filename = sstable.getFilename();
         this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables();
-        ComponentManifest manifest = 
ComponentManifest.create(sstable.descriptor);
+        ComponentManifest manifest = ComponentManifest.create(sstable);
         this.header = makeHeader(sstable, operation, sections, estimatedKeys, 
shouldStreamEntireSSTable, manifest);
     }
 
@@ -154,7 +154,7 @@ public class CassandraOutgoingFile implements OutgoingStream
             // redistribution, otherwise file sizes recorded in component 
manifest will be different from actual
             // file sizes.
             // Recreate the latest manifest and hard links for mutatable 
components in case they are modified.
-            try (ComponentContext context = sstable.runWithLock(ignored -> 
ComponentContext.create(sstable.descriptor)))
+            try (ComponentContext context = sstable.runWithLock(ignored -> 
ComponentContext.create(sstable)))
             {
                 CassandraStreamHeader current = makeHeader(sstable, operation, 
sections, estimatedKeys, true, context.manifest());
                 CassandraStreamHeader.serializer.serialize(current, out, 
version);
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentContext.java 
b/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
index 164dd6ba5c..c03e7b4c34 100644
--- a/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileUtils;
 
@@ -44,8 +45,9 @@ public class ComponentContext implements AutoCloseable
         this.manifest = manifest;
     }
 
-    public static ComponentContext create(Descriptor descriptor)
+    public static ComponentContext create(SSTable sstable)
     {
+        Descriptor descriptor = sstable.descriptor;
         Map<Component, File> hardLinks = new HashMap<>(1);
 
         for (Component component : descriptor.getFormat().mutableComponents())
@@ -59,7 +61,7 @@ public class ComponentContext implements AutoCloseable
             hardLinks.put(component, hardlink);
         }
 
-        return new ComponentContext(hardLinks, 
ComponentManifest.create(descriptor));
+        return new ComponentContext(hardLinks, 
ComponentManifest.create(sstable));
     }
 
     public ComponentManifest manifest()
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java 
b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
index 5e3cc0c6b6..f220be3a48 100644
--- a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
@@ -33,7 +34,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -52,13 +53,14 @@ public final class ComponentManifest implements 
Iterable<Component>
     }
 
     @VisibleForTesting
-    public static ComponentManifest create(Descriptor descriptor)
+    public static ComponentManifest create(SSTable sstable)
     {
-        LinkedHashMap<Component, Long> components = new 
LinkedHashMap<>(descriptor.getFormat().streamingComponents().size());
+        Set<Component> streamingComponents = sstable.getStreamingComponents();
+        LinkedHashMap<Component, Long> components = new 
LinkedHashMap<>(streamingComponents.size());
 
-        for (Component component : 
descriptor.getFormat().streamingComponents())
+        for (Component component : streamingComponents)
         {
-            File file = descriptor.fileFor(component);
+            File file = sstable.descriptor.fileFor(component);
             if (!file.exists())
                 continue;
 
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java 
b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java
index 670ba82c09..4d802ccae9 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java
@@ -18,8 +18,15 @@
 
 package org.apache.cassandra.index.sai.disk.format;
 
+import java.util.regex.Pattern;
+
 import org.apache.cassandra.index.sai.disk.v1.postings.PostingsWriter;
 import org.apache.cassandra.index.sai.disk.v1.trie.TrieTermsDictionaryWriter;
+import org.apache.cassandra.io.sstable.Component;
+
+import static 
org.apache.cassandra.index.sai.disk.format.Version.SAI_DESCRIPTOR;
+import static org.apache.cassandra.index.sai.disk.format.Version.SAI_SEPARATOR;
+
 
 /**
  * This is a definitive list of all the on-disk components for all versions
@@ -80,9 +87,20 @@ public enum IndexComponent
     GROUP_COMPLETION_MARKER("GroupComplete");
 
     public final String name;
+    public final Component.Type type;
 
     IndexComponent(String name)
     {
         this.name = name;
+        this.type = componentType(name);
+    }
+
+    private static Component.Type componentType(String name)
+    {
+        String componentName = SAI_DESCRIPTOR + SAI_SEPARATOR + name;
+        String repr = Pattern.quote(SAI_DESCRIPTOR + SAI_SEPARATOR)
+                      + ".*"
+                      + Pattern.quote(SAI_SEPARATOR + name + ".db");
+        return Component.Type.create(componentName, repr, true, null);
     }
 }
diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java 
b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java
index 16e84d94c7..8dd57581a3 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java
@@ -29,7 +29,6 @@ import org.apache.cassandra.index.sai.IndexContext;
 import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
 
 /**
  * Format version of indexing component, denoted as [major][minor]. Same 
forward-compatibility rules apply as to
@@ -37,8 +36,8 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
  */
 public class Version implements Comparable<Version>
 {
-    private static final String SAI_DESCRIPTOR = "SAI";
-    private static final String SAI_SEPARATOR = "+";
+    public static final String SAI_DESCRIPTOR = "SAI";
+    public static final String SAI_SEPARATOR = "+";
 
     // Current version
     public static final Version AA = new Version("aa", 
V1OnDiskFormat.instance, (c, i) -> defaultFileNameFormat(c, i, "aa"));
@@ -113,12 +112,12 @@ public class Version implements Comparable<Version>
 
     public Component makePerSSTableComponent(IndexComponent indexComponent)
     {
-        return 
SSTableFormat.Components.Types.CUSTOM.createComponent(fileNameFormatter.format(indexComponent,
 null));
+        return 
indexComponent.type.createComponent(fileNameFormatter.format(indexComponent, 
null));
     }
 
     public Component makePerIndexComponent(IndexComponent indexComponent, 
IndexContext indexContext)
     {
-        return 
SSTableFormat.Components.Types.CUSTOM.createComponent(fileNameFormatter.format(indexComponent,
 indexContext));
+        return 
indexComponent.type.createComponent(fileNameFormatter.format(indexComponent, 
indexContext));
     }
 
     public FileNameFormatter fileNameFormatter()
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
index c55d4b07ef..189d9b6a5f 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,13 +55,15 @@ public class V1OnDiskFormat implements OnDiskFormat
 {
     private static final Logger logger = 
LoggerFactory.getLogger(V1OnDiskFormat.class);
 
-    private static final Set<IndexComponent> PER_SSTABLE_COMPONENTS = 
EnumSet.of(IndexComponent.GROUP_COMPLETION_MARKER,
+    @VisibleForTesting
+    public static final Set<IndexComponent> PER_SSTABLE_COMPONENTS = 
EnumSet.of(IndexComponent.GROUP_COMPLETION_MARKER,
                                                                                
  IndexComponent.GROUP_META,
                                                                                
  IndexComponent.TOKEN_VALUES,
                                                                                
  IndexComponent.PRIMARY_KEY_TRIE,
                                                                                
  IndexComponent.PRIMARY_KEY_BLOCKS,
                                                                                
  IndexComponent.PRIMARY_KEY_BLOCK_OFFSETS);
-    private static final Set<IndexComponent> LITERAL_COMPONENTS = 
EnumSet.of(IndexComponent.COLUMN_COMPLETION_MARKER,
+    @VisibleForTesting
+    public static final Set<IndexComponent> LITERAL_COMPONENTS = 
EnumSet.of(IndexComponent.COLUMN_COMPLETION_MARKER,
                                                                              
IndexComponent.META,
                                                                              
IndexComponent.TERMS_DATA,
                                                                              
IndexComponent.POSTING_LISTS);
@@ -170,10 +173,11 @@ public class V1OnDiskFormat implements OnDiskFormat
                 {
                     if (logger.isDebugEnabled())
                     {
-                        logger.debug(indexDescriptor.logMessage("{} failed for 
index component {} on SSTable {}"),
-                                     (checksum ? "Checksum validation" : 
"Validation"),
+                        logger.debug(indexDescriptor.logMessage("{} failed for 
index component {} on SSTable {}. Error: {}"),
+                                     checksum ? "Checksum validation" : 
"Validation",
                                      indexComponent,
-                                     indexDescriptor.sstableDescriptor);
+                                     indexDescriptor.sstableDescriptor,
+                                     e);
                     }
                     return false;
                 }
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java 
b/src/java/org/apache/cassandra/io/sstable/Component.java
index f2eea992a3..0d89cf0b92 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -52,6 +52,7 @@ public class Component
         public final int id;
         public final String name;
         public final String repr;
+        public final boolean streamable;
         private final Component singleton;
 
         @SuppressWarnings("rawtypes")
@@ -60,31 +61,34 @@ public class Component
         /**
          * Creates a new non-singleton type and registers it a global type 
registry - see {@link #registerType(Type)}.
          *
-         * @param name        type name, must be unique for this and all 
parent formats
-         * @param repr        the regular expression to be used to recognize a 
name represents this type
-         * @param formatClass format class for which this type is defined for
+         * @param name         type name, must be unique for this and all 
parent formats
+         * @param repr         the regular expression to be used to recognize 
a name represents this type
+         * @param streamable   whether components of this type should be 
streamed to other nodes
+         * @param formatClass  format class for which this type is defined for
          */
-        public static Type create(String name, String repr, Class<? extends 
SSTableFormat<?, ?>> formatClass)
+        public static Type create(String name, String repr, boolean 
streamable, Class<? extends SSTableFormat<?, ?>> formatClass)
         {
-            return new Type(name, repr, false, formatClass);
+            return new Type(name, repr, false, streamable, formatClass);
         }
 
         /**
          * Creates a new singleton type and registers it in a global type 
registry - see {@link #registerType(Type)}.
          *
-         * @param name        type name, must be unique for this and all 
parent formats
-         * @param repr        the regular expression to be used to recognize a 
name represents this type
-         * @param formatClass format class for which this type is defined for
+         * @param name         type name, must be unique for this and all 
parent formats
+         * @param repr         the regular expression to be used to recognize 
a name represents this type
+         * @param streamable   whether components of this type should be 
streamed to other nodes
+         * @param formatClass  format class for which this type is defined for
          */
-        public static Type createSingleton(String name, String repr, Class<? 
extends SSTableFormat<?, ?>> formatClass)
+        public static Type createSingleton(String name, String repr, boolean 
streamable, Class<? extends SSTableFormat<?, ?>> formatClass)
         {
-            return new Type(name, repr, true, formatClass);
+            return new Type(name, repr, true, streamable, formatClass);
         }
 
-        private Type(String name, String repr, boolean isSingleton, Class<? 
extends SSTableFormat<?, ?>> formatClass)
+        private Type(String name, String repr, boolean isSingleton, boolean 
streamable, Class<? extends SSTableFormat<?, ?>> formatClass)
         {
             this.name = Objects.requireNonNull(name);
             this.repr = repr;
+            this.streamable = streamable;
             this.id = typesCollector.size();
             this.formatClass = formatClass == null ? SSTableFormat.class : 
formatClass;
             this.singleton = isSingleton ? new Component(this) : null;
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java 
b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 7936e2ea53..475f92beeb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -162,6 +163,16 @@ public abstract class SSTable
         return ImmutableSet.copyOf(components);
     }
 
+    /**
+     * Returns all SSTable components that should be streamed.
+     */
+    public Set<Component> getStreamingComponents()
+    {
+        return components.stream()
+                         .filter(c -> c.type.streamable)
+                         .collect(Collectors.toSet());
+    }
+
     public TableMetadata metadata()
     {
         return metadata.get();
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
index 6d088dfd2d..56c96e76ea 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
@@ -23,10 +23,10 @@ import java.nio.channels.ClosedChannelException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +50,7 @@ public class SSTableZeroCopyWriter extends SSTable implements 
SSTableMultiWriter
     private static final Logger logger = 
LoggerFactory.getLogger(SSTableZeroCopyWriter.class);
 
     private volatile SSTableReader finalReader;
-    private final Map<Component.Type, SequentialWriter> componentWriters;
+    private final Map<String, SequentialWriter> componentWriters; // indexed 
by component name
 
     public SSTableZeroCopyWriter(Builder<?, ?> builder,
                                  LifecycleNewTracker lifecycleNewTracker,
@@ -61,12 +61,14 @@ public class SSTableZeroCopyWriter extends SSTable 
implements SSTableMultiWriter
         lifecycleNewTracker.trackNew(this);
         this.componentWriters = new HashMap<>();
 
-        if 
(!descriptor.getFormat().streamingComponents().containsAll(components))
-            throw new AssertionError(format("Unsupported streaming component 
detected %s",
-                                            
Sets.difference(ImmutableSet.copyOf(components), 
descriptor.getFormat().streamingComponents())));
+        Set<Component> unsupported = components.stream()
+                                               .filter(c -> !c.type.streamable)
+                                               .collect(Collectors.toSet());
+        if (!unsupported.isEmpty())
+            throw new AssertionError(format("Unsupported streaming components 
detected: %s", unsupported));
 
         for (Component c : components)
-            componentWriters.put(c.type, makeWriter(descriptor, c));
+            componentWriters.put(c.name, makeWriter(descriptor, c));
     }
 
     @Override
@@ -195,14 +197,16 @@ public class SSTableZeroCopyWriter extends SSTable 
implements SSTableMultiWriter
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long 
size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long 
size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, 
componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        @SuppressWarnings({"resource", "RedundantSuppression"})  // all 
writers are closed in close()
+        SequentialWriter writer = componentWriters.get(component.name);
+        logger.info("Writing component {} to {} length {}", component, 
writer.getPath(), prettyPrintMemory(size));
 
         if (in instanceof AsyncStreamingInputPlus)
-            write((AsyncStreamingInputPlus) in, size, 
componentWriters.get(type));
+            write((AsyncStreamingInputPlus) in, size, writer);
         else
-            write(in, size, componentWriters.get(type));
+            write(in, size, writer);
     }
 
     private void write(AsyncStreamingInputPlus in, long size, SequentialWriter 
writer) throws ClosedChannelException
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable_API.md 
b/src/java/org/apache/cassandra/io/sstable/SSTable_API.md
index 6a0440655e..7fb23ca4d8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable_API.md
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable_API.md
@@ -77,19 +77,19 @@ Apart from the generic components, each sstable format 
implementation may descri
 For example, the _big table_ format describes additionally `PRIMARY_INDEX` and 
`SUMMARY` singleton types and 
 the corresponding singleton components (see 
[`BigFormat.Components`](format/big/BigFormat.java)).
 
-Custom types can be created with one of the `Component.Type.create(name, repr, 
formatClass)`,
-`Component.Type.createSingleton(name, repr, formatClass)` methods. Each 
created type is registered in a global types'
-registry. Types registry is hierarchical which means that an sstable 
implementation may use types defined for its
-format class and for all parent format classes (for example, the types defined 
for the `BigFormat` class extend the set
-of types defined for the `SSTableFormat` interface).
+Custom types can be created with one of the `Component.Type.create(name, repr, 
streamable, formatClass)`,
+`Component.Type.createSingleton(name, repr, streamable, formatClass)` methods. 
Each created type is registered in 
+a global types' registry. Types registry is hierarchical which means that an 
sstable implementation may use types 
+defined for its format class and for all parent format classes (for example, 
the types defined for the `BigFormat` class
+extend the set of types defined for the `SSTableFormat` interface).
 
 For example, types defined for `BigFormat`:
 
 ```java
 public static class Types extends SSTableFormat.Components.Types
 {
-    public static final Component.Type PRIMARY_INDEX = 
Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", BigFormat.class);
-    public static final Component.Type SUMMARY = 
Component.Type.createSingleton("SUMMARY", "Summary.db", BigFormat.class);
+    public static final Component.Type PRIMARY_INDEX = 
Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", true, 
BigFormat.class);
+    public static final Component.Type SUMMARY = 
Component.Type.createSingleton("SUMMARY", "Summary.db", true, BigFormat.class);
 }
 ```
 
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
index 1caa87b9cc..654880c2c1 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -59,7 +59,6 @@ public interface SSTableFormat<R extends SSTableReader, W 
extends SSTableWriter>
      */
     Set<Component> allComponents();
 
-    Set<Component> streamingComponents();
 
     Set<Component> primaryComponents();
 
@@ -156,23 +155,23 @@ public interface SSTableFormat<R extends SSTableReader, W 
extends SSTableWriter>
         {
             // the base data for an sstable: the remaining components can be 
regenerated
             // based on the data component
-            public static final Component.Type DATA = 
Component.Type.createSingleton("DATA", "Data.db", null);
+            public static final Component.Type DATA = 
Component.Type.createSingleton("DATA", "Data.db", true, null);
             // file to hold information about uncompressed data length, chunk 
offsets etc.
-            public static final Component.Type COMPRESSION_INFO = 
Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
+            public static final Component.Type COMPRESSION_INFO = 
Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, 
null);
             // statistical metadata about the content of the sstable
-            public static final Component.Type STATS = 
Component.Type.createSingleton("STATS", "Statistics.db", null);
+            public static final Component.Type STATS = 
Component.Type.createSingleton("STATS", "Statistics.db", true, null);
             // serialized bloom filter for the row keys in the sstable
-            public static final Component.Type FILTER = 
Component.Type.createSingleton("FILTER", "Filter.db", null);
+            public static final Component.Type FILTER = 
Component.Type.createSingleton("FILTER", "Filter.db", true, null);
             // holds CRC32 checksum of the data file
-            public static final Component.Type DIGEST = 
Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
+            public static final Component.Type DIGEST = 
Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
             // holds the CRC32 for chunks in an uncompressed file.
-            public static final Component.Type CRC = 
Component.Type.createSingleton("CRC", "CRC.db", null);
+            public static final Component.Type CRC = 
Component.Type.createSingleton("CRC", "CRC.db", true, null);
             // table of contents, stores the list of all components for the 
sstable
-            public static final Component.Type TOC = 
Component.Type.createSingleton("TOC", "TOC.txt", null);
+            public static final Component.Type TOC = 
Component.Type.createSingleton("TOC", "TOC.txt", false, null);
             // built-in secondary index (may exist multiple per sstable)
-            public static final Component.Type SECONDARY_INDEX = 
Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
+            public static final Component.Type SECONDARY_INDEX = 
Component.Type.create("SECONDARY_INDEX", "SI_.*.db", false, null);
             // custom component, used by e.g. custom compaction strategy
-            public static final Component.Type CUSTOM = 
Component.Type.create("CUSTOM", null, null);
+            public static final Component.Type CUSTOM = 
Component.Type.create("CUSTOM", null, true, null);
         }
 
         // singleton components for types that don't need ids
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index f7f2e4d850..4de58b4b6b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -83,9 +83,9 @@ public class BigFormat extends 
AbstractSSTableFormat<BigTableReader, BigTableWri
         public static class Types extends SSTableFormat.Components.Types
         {
             // index of the row keys with pointers to their positions in the 
data file
-            public static final Component.Type PRIMARY_INDEX = 
Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", BigFormat.class);
+            public static final Component.Type PRIMARY_INDEX = 
Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", true, 
BigFormat.class);
             // holds SSTable Index Summary (sampling of Index component)
-            public static final Component.Type SUMMARY = 
Component.Type.createSingleton("SUMMARY", "Summary.db", BigFormat.class);
+            public static final Component.Type SUMMARY = 
Component.Type.createSingleton("SUMMARY", "Summary.db", true, BigFormat.class);
         }
 
         public final static Component PRIMARY_INDEX = 
Types.PRIMARY_INDEX.getSingleton();
@@ -110,16 +110,6 @@ public class BigFormat extends 
AbstractSSTableFormat<BigTableReader, BigTableWri
                                                                                
 SUMMARY,
                                                                                
 COMPRESSION_INFO,
                                                                                
 STATS);
-
-        private static final Set<Component> STREAM_COMPONENTS = 
ImmutableSet.of(DATA,
-                                                                               
 PRIMARY_INDEX,
-                                                                               
 STATS,
-                                                                               
 COMPRESSION_INFO,
-                                                                               
 FILTER,
-                                                                               
 SUMMARY,
-                                                                               
 DIGEST,
-                                                                               
 CRC);
-
         private static final Set<Component> ALL_COMPONENTS = 
ImmutableSet.of(DATA,
                                                                              
PRIMARY_INDEX,
                                                                              
STATS,
@@ -181,12 +171,6 @@ public class BigFormat extends 
AbstractSSTableFormat<BigTableReader, BigTableWri
         return Components.ALL_COMPONENTS;
     }
 
-    @Override
-    public Set<Component> streamingComponents()
-    {
-        return Components.STREAM_COMPONENTS;
-    }
-
     @Override
     public Set<Component> primaryComponents()
     {
diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java 
b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java
index 0e9c447dc5..65d97d3d40 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java
@@ -71,23 +71,14 @@ public class BtiFormat extends 
AbstractSSTableFormat<BtiTableReader, BtiTableWri
     {
         public static class Types extends 
AbstractSSTableFormat.Components.Types
         {
-            public static final Component.Type PARTITION_INDEX = 
Component.Type.createSingleton("PARTITION_INDEX", "Partitions.db", 
BtiFormat.class);
-            public static final Component.Type ROW_INDEX = 
Component.Type.createSingleton("ROW_INDEX", "Rows.db", BtiFormat.class);
+            public static final Component.Type PARTITION_INDEX = 
Component.Type.createSingleton("PARTITION_INDEX", "Partitions.db", true, 
BtiFormat.class);
+            public static final Component.Type ROW_INDEX = 
Component.Type.createSingleton("ROW_INDEX", "Rows.db", true, BtiFormat.class);
         }
 
         public final static Component PARTITION_INDEX = 
Types.PARTITION_INDEX.getSingleton();
 
         public final static Component ROW_INDEX = 
Types.ROW_INDEX.getSingleton();
 
-        private final static Set<Component> STREAMING_COMPONENTS = 
ImmutableSet.of(DATA,
-                                                                               
    PARTITION_INDEX,
-                                                                               
    ROW_INDEX,
-                                                                               
    STATS,
-                                                                               
    COMPRESSION_INFO,
-                                                                               
    FILTER,
-                                                                               
    DIGEST,
-                                                                               
    CRC);
-
         private final static Set<Component> PRIMARY_COMPONENTS = 
ImmutableSet.of(DATA,
                                                                                
  PARTITION_INDEX);
 
@@ -159,12 +150,6 @@ public class BtiFormat extends 
AbstractSSTableFormat<BtiTableReader, BtiTableWri
         return readerFactory;
     }
 
-    @Override
-    public Set<Component> streamingComponents()
-    {
-        return Components.STREAMING_COMPONENTS;
-    }
-
     @Override
     public Set<Component> primaryComponents()
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
new file mode 100644
index 0000000000..3069e807a8
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    // streaming sends events every 65k, so need to make sure that the files 
are larger than this to hit
+    // all cases of the vtable - hence we add a big enough blob column
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+    }
+
+    private static int sstableStreamingComponentsCount()
+    {
+        return (int) DatabaseDescriptor.getSelectedSSTableFormat()
+                                       .allComponents()
+                                       .stream()
+                                       .filter(c -> c.type.streamable)
+                                       .count() - 1;  // -1 because we don't 
include the compression component
+    }
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> 
c.with(Feature.values())
+                                                             
.set("stream_entire_sstables", zeroCopyStreaming)
+                                                             
.set("streaming_slow_events_log_timeout", "0s"))
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace(
+                "CREATE TABLE %s.test (pk int PRIMARY KEY, v text, b blob) 
WITH compression = { 'enabled' : false };"
+            ));
+            cluster.schemaChange(withKeyspace(
+                "CREATE CUSTOM INDEX ON %s.test(v) USING 
'StorageAttachedIndex';"
+            ));
+            cluster.stream().forEach(i ->
+                i.nodetoolResult("disableautocompaction", 
KEYSPACE).asserts().success()
+            );
+            IInvokableInstance first = cluster.get(1);
+            IInvokableInstance second = cluster.get(2);
+            long sstableCount = 10;
+            long expectedFiles = zeroCopyStreaming ? sstableCount * 
NUM_COMPONENTS : sstableCount;
+            for (int i = 0; i < sstableCount; i++)
+            {
+                first.executeInternal(withKeyspace("insert into %s.test(pk, v, 
b) values (?, ?, ?)"), i, "v" + i, BLOB);
+                first.flush(KEYSPACE);
+            }
+
+            second.nodetoolResult("rebuild", "--keyspace", 
KEYSPACE).asserts().success();
+
+            SimpleQueryResult qr = first.executeInternalWithResult("SELECT * 
FROM system_views.streaming");
+            String txt = QueryResultUtil.expand(qr);
+            qr.reset();
+            assertThat(qr.toObjectArrays().length).describedAs("Found 
rows\n%s", txt).isEqualTo(1);
+            assertThat(qr.hasNext()).isTrue();
+            Row row = qr.next();
+            QueryResultUtil.assertThat(row)
+                           .isEqualTo("peers", 
Collections.singletonList(second.broadcastAddress().toString()))
+                           .isEqualTo("follower", true)
+                           .isEqualTo("operation", "Rebuild")
+                           .isEqualTo("status", "success")
+                           .isEqualTo("progress_percentage", 100.0F)
+                           .isEqualTo("success_message", 
null).isEqualTo("failure_cause", null)
+                           .isEqualTo("files_sent", expectedFiles)
+                           .columnsEqualTo("files_sent", "files_to_send")
+                           .columnsEqualTo("bytes_sent", "bytes_to_send")
+                           .isEqualTo("files_received", 0L)
+                           .columnsEqualTo("files_received", 
"files_to_receive", "bytes_received", "bytes_to_receive");
+            long totalBytes = row.getLong("bytes_sent");
+            assertThat(totalBytes).isGreaterThan(0);
+
+            qr = second.executeInternalWithResult("SELECT * FROM 
system_views.streaming");
+            txt = QueryResultUtil.expand(qr);
+            qr.reset();
+            assertThat(qr.toObjectArrays().length).describedAs("Found 
rows\n%s", txt).isEqualTo(1);
+            assertThat(qr.hasNext()).isTrue();
+
+            QueryResultUtil.assertThat(qr.next())
+                           .isEqualTo("peers", 
Collections.singletonList(first.broadcastAddress().toString()))
+                           .isEqualTo("follower", false)
+                           .isEqualTo("operation", "Rebuild")
+                           .isEqualTo("status", "success")
+                           .isEqualTo("progress_percentage", 100.0F)
+                           .isEqualTo("success_message", 
null).isEqualTo("failure_cause", null)
+                           .columnsEqualTo("files_to_receive", 
"files_received").isEqualTo("files_received", expectedFiles)
+                           .columnsEqualTo("bytes_to_receive", 
"bytes_received").isEqualTo("bytes_received", totalBytes)
+                           .columnsEqualTo("files_sent", "files_to_send", 
"bytes_sent", "bytes_to_send").isEqualTo("files_sent", 0L);
+
+            // did we trigger slow event log?
+            cluster.forEach(i -> Assertions.assertThat(i.logs().grep("Handling 
streaming events took longer than").getResult())
+                                           .describedAs("Unable to find slow 
log for node%d", i.config().num())
+                                           .isNotEmpty());
+
+            for (int i = 0; i < sstableCount; i++)
+            {
+                Object[][] rs = second.executeInternal(withKeyspace("select pk 
from %s.test where v = ?"), "v" + i);
+                assertThat(rs.length).isEqualTo(1);
+                assertThat(rs[0][0]).isEqualTo(i);
+            }
+        }
+    }
+}
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
 
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index b3083eff32..c43648a3d1 100644
--- 
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -121,7 +121,7 @@ public class ZeroCopyStreamingBenchmark
 
             sstable = store.getLiveSSTables().iterator().next();
             session = setupStreamingSessionForTest();
-            context = ComponentContext.create(sstable.descriptor);
+            context = ComponentContext.create(sstable);
             blockStreamWriter = new 
CassandraEntireSSTableStreamWriter(sstable, session, context);
 
             CapturingNettyChannel blockStreamCaptureChannel = new 
CapturingNettyChannel(STREAM_SIZE);
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
 
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index e3305f24b6..89a3e9ed36 100644
--- 
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++ 
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -117,7 +117,7 @@ public class CassandraEntireSSTableStreamWriterTest
 
         EmbeddedChannel channel = new EmbeddedChannel();
         try (AsyncStreamingOutputPlus out = new 
AsyncStreamingOutputPlus(channel);
-             ComponentContext context = ComponentContext.create(descriptor))
+             ComponentContext context = ComponentContext.create(sstable))
         {
             CassandraEntireSSTableStreamWriter writer = new 
CassandraEntireSSTableStreamWriter(sstable, session, context);
 
@@ -140,7 +140,7 @@ public class CassandraEntireSSTableStreamWriterTest
         ByteBuf serializedFile = Unpooled.buffer(8192);
         EmbeddedChannel channel = createMockNettyChannel(serializedFile);
         try (AsyncStreamingOutputPlus out = new 
AsyncStreamingOutputPlus(channel);
-             ComponentContext context = ComponentContext.create(descriptor))
+             ComponentContext context = ComponentContext.create(sstable))
         {
             CassandraEntireSSTableStreamWriter writer = new 
CassandraEntireSSTableStreamWriter(sstable, session, context);
             writer.write(out);
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java 
b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
index 52e680f158..de953734ca 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
@@ -108,7 +108,7 @@ public class CassandraStreamHeaderTest
         // verify all component on-disk length is used for ZCS
         CassandraStreamHeader header = header(true, true);
         long transferedSize = header.size();
-        assertEquals(ComponentManifest.create(sstable.descriptor).totalSize(), 
transferedSize);
+        assertEquals(ComponentManifest.create(sstable).totalSize(), 
transferedSize);
         assertEquals(transferedSize, header.calculateSize());
 
         // verify that computing file chunks doesn't change transferred size 
for ZCS
@@ -142,7 +142,7 @@ public class CassandraStreamHeaderTest
 
         TableMetadata metadata = store.metadata();
         SerializationHeader.Component serializationHeader = 
SerializationHeader.makeWithoutStats(metadata).toComponent();
-        ComponentManifest componentManifest = entireSSTable ? 
ComponentManifest.create(sstable.descriptor) : null;
+        ComponentManifest componentManifest = entireSSTable ? 
ComponentManifest.create(sstable) : null;
         DecoratedKey firstKey = entireSSTable ? sstable.getFirst() : null;
 
         return CassandraStreamHeader.builder()
diff --git a/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java 
b/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java
index a9b76303f2..e58238c444 100644
--- a/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java
@@ -57,18 +57,18 @@ public class ComponentTest
         Function<Type, Component> componentFactory = 
Mockito.mock(Function.class);
 
         // do not allow to define a type with the same name or repr as the 
existing type for this or parent format
-        assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> 
Type.createSingleton(Components.Types.TOC.name, Components.Types.TOC.repr + 
"x", Format1.class));
-        assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> 
Type.createSingleton(Components.Types.TOC.name + "x", 
Components.Types.TOC.repr, Format2.class));
+        assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> 
Type.createSingleton(Components.Types.TOC.name, Components.Types.TOC.repr + 
"x", true, Format1.class));
+        assertThatExceptionOfType(AssertionError.class).isThrownBy(() -> 
Type.createSingleton(Components.Types.TOC.name + "x", 
Components.Types.TOC.repr, true, Format2.class));
 
         // allow to define a format with other name and repr
-        Type t1 = Type.createSingleton("ONE", "One.db", Format1.class);
+        Type t1 = Type.createSingleton("ONE", "One.db", true, Format1.class);
 
         // allow to define a format with the same name and repr for two 
different formats
-        Type t2f1 = Type.createSingleton("TWO", "Two.db", Format1.class);
-        Type t2f2 = Type.createSingleton("TWO", "Two.db", Format2.class);
+        Type t2f1 = Type.createSingleton("TWO", "Two.db", true, Format1.class);
+        Type t2f2 = Type.createSingleton("TWO", "Two.db", true, Format2.class);
         assertThat(t2f1).isNotEqualTo(t2f2);
 
-        assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> 
Type.createSingleton(null, "-Three.db", Format1.class));
+        assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> 
Type.createSingleton(null, "-Three.db", true, Format1.class));
 
         assertThat(Type.fromRepresentation("should be custom", 
BigFormat.getInstance())).isSameAs(Components.Types.CUSTOM);
         assertThat(Type.fromRepresentation(Components.Types.TOC.repr, 
BigFormat.getInstance())).isSameAs(Components.Types.TOC);
@@ -80,10 +80,10 @@ public class ComponentTest
     @Test
     public void testComponents()
     {
-        Type t3f1 = Type.createSingleton("THREE", "Three.db", Format1.class);
-        Type t3f2 = Type.createSingleton("THREE", "Three.db", Format2.class);
-        Type t4f1 = Type.create("FOUR", ".*-Four.db", Format1.class);
-        Type t4f2 = Type.create("FOUR", ".*-Four.db", Format2.class);
+        Type t3f1 = Type.createSingleton("THREE", "Three.db", true, 
Format1.class);
+        Type t3f2 = Type.createSingleton("THREE", "Three.db", true, 
Format2.class);
+        Type t4f1 = Type.create("FOUR", ".*-Four.db", true, Format1.class);
+        Type t4f2 = Type.create("FOUR", ".*-Four.db", true, Format2.class);
 
         Component c1 = t3f1.getSingleton();
         Component c2 = t3f2.getSingleton();
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java
index 11b92ac499..4d64a73602 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java
@@ -168,7 +168,7 @@ public class SSTableZeroCopyWriterTest
 
                 try
                 {
-                    btzcw.writeComponent(component.type, pair.left, 
pair.right);
+                    btzcw.writeComponent(component, pair.left, pair.right);
                 }
                 catch (ClosedChannelException e)
                 {
diff --git 
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
 
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
index 5a8f4a58cf..a4022cb346 100644
--- 
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
@@ -137,7 +137,7 @@ public class EntireSSTableStreamingCorrectFilesCountTest
 
         int totalNumberOfFiles = 
session.transfers.get(store.metadata.id).getTotalNumberOfFiles();
 
-        
assertEquals(ComponentManifest.create(sstable.descriptor).components().size(), 
totalNumberOfFiles);
+        assertEquals(ComponentManifest.create(sstable).components().size(), 
totalNumberOfFiles);
         assertEquals(streamEventHandler.fileNames.size(), totalNumberOfFiles);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to