This is an automated email from the ASF dual-hosted git repository.

djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b05fe7a  Optimize Zero Copy Streaming file containment check by using 
file sections
b05fe7a is described below

commit b05fe7ab010218f1fb23b3192e2aea719a9611de
Author: Zhao Yang <zhaoyangsingap...@gmail.com>
AuthorDate: Mon Mar 23 22:05:53 2020 +0800

    Optimize Zero Copy Streaming file containment check by using file sections
    
    Patch By Zhao Yang; Reviewed by T Jake Luciani and Dinesh Joshi for 
CASSANDRA-15657
---
 .../db/streaming/CassandraOutgoingFile.java        | 56 ++++------------------
 .../db/streaming/CassandraOutgoingFileTest.java    | 15 +++---
 2 files changed, 19 insertions(+), 52 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index e8f5485..237c0af 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -30,14 +30,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.KeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.AsyncStreamingOutputPlus;
@@ -47,8 +42,6 @@ import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.concurrent.Ref;
 
-import static org.apache.cassandra.db.compaction.Verifier.RangeOwnHelper;
-
 /**
  * used to transfer the part(or whole) of a SSTable data file
  */
@@ -65,9 +58,8 @@ public class CassandraOutgoingFile implements OutgoingStream
     private final CassandraStreamHeader header;
     private final boolean keepSSTableLevel;
     private final ComponentManifest manifest;
-    private Boolean isFullyContained;
 
-    private final List<Range<Token>> normalizedRanges;
+    private final boolean shouldStreamEntireSStable;
 
     public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> 
ref,
                                  List<SSTableReader.PartitionPositionBounds> 
sections, List<Range<Token>> normalizedRanges,
@@ -78,9 +70,9 @@ public class CassandraOutgoingFile implements OutgoingStream
         this.ref = ref;
         this.estimatedKeys = estimatedKeys;
         this.sections = sections;
-        this.normalizedRanges = ImmutableList.copyOf(normalizedRanges);
         this.filename = ref.get().getFilename();
         this.manifest = getComponentManifest(ref.get());
+        this.shouldStreamEntireSStable = shouldStreamEntireSSTable();
 
         SSTableReader sstable = ref.get();
         keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation 
== StreamOperation.REBUILD;
@@ -93,7 +85,7 @@ public class CassandraOutgoingFile implements OutgoingStream
                                  .withSections(sections)
                                  .withCompressionMetadata(sstable.compression 
? sstable.getCompressionMetadata() : null)
                                  
.withSerializationHeader(sstable.header.toComponent())
-                                 .isEntireSSTable(shouldStreamEntireSSTable())
+                                 .isEntireSSTable(shouldStreamEntireSStable)
                                  .withComponentManifest(manifest)
                                  .withFirstKey(sstable.first)
                                  .withTableId(sstable.metadata().id)
@@ -163,7 +155,7 @@ public class CassandraOutgoingFile implements OutgoingStream
         CassandraStreamHeader.serializer.serialize(header, out, version);
         out.flush();
 
-        if (shouldStreamEntireSSTable() && out instanceof 
AsyncStreamingOutputPlus)
+        if (shouldStreamEntireSStable && out instanceof 
AsyncStreamingOutputPlus)
         {
             CassandraEntireSSTableStreamWriter writer = new 
CassandraEntireSSTableStreamWriter(sstable, session, manifest);
             writer.write((AsyncStreamingOutputPlus) out);
@@ -185,46 +177,18 @@ public class CassandraOutgoingFile implements 
OutgoingStream
         if (!DatabaseDescriptor.streamEntireSSTables() || 
ref.get().getSSTableMetadata().hasLegacyCounterShards)
             return false;
 
-        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(getTableId());
-
-        if (cfs == null)
-            return false;
-
-        AbstractCompactionStrategy compactionStrategy = 
cfs.getCompactionStrategyManager()
-                                                           
.getCompactionStrategyFor(ref.get());
-
-        if (compactionStrategy instanceof LeveledCompactionStrategy)
-            return contained(normalizedRanges, ref.get());
-
-        return false;
+        return contained(sections, ref.get());
     }
 
     @VisibleForTesting
-    public boolean contained(List<Range<Token>> normalizedRanges, 
SSTableReader sstable)
+    public boolean contained(List<SSTableReader.PartitionPositionBounds> 
sections, SSTableReader sstable)
     {
-        if (isFullyContained != null)
-            return isFullyContained;
-
-        isFullyContained = computeContainment(normalizedRanges, sstable);
-        return isFullyContained;
-    }
-
-    private boolean computeContainment(List<Range<Token>> normalizedRanges, 
SSTableReader sstable)
-    {
-        if (normalizedRanges == null)
+        if (sections == null || sections.isEmpty())
             return false;
 
-        RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges);
-        try (KeyIterator iter = new KeyIterator(sstable.descriptor, 
sstable.metadata()))
-        {
-            while (iter.hasNext())
-            {
-                DecoratedKey key = iter.next();
-                if (!rangeOwnHelper.check(key))
-                    return false;
-            }
-        }
-        return true;
+        // if transfer sections contain entire sstable
+        long transferLength = sections.stream().mapToLong(p -> p.upperPosition 
- p.lowerPosition).sum();
+        return transferLength == sstable.uncompressedLength();
     }
 
     @Override
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java 
b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
index 5e44346..9d663b5 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
@@ -89,11 +89,12 @@ public class CassandraOutgoingFileTest
     {
         List<Range<Token>> requestedRanges = Arrays.asList(new 
Range<>(store.getPartitioner().getMinimumToken(), sstable.last.getToken()));
 
+        List<SSTableReader.PartitionPositionBounds> sections = 
sstable.getPositionsForRanges(requestedRanges);
         CassandraOutgoingFile cof = new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
-                                                              
sstable.getPositionsForRanges(requestedRanges),
+                                                              sections,
                                                               requestedRanges, 
sstable.estimatedKeys());
 
-        assertTrue(cof.contained(requestedRanges, sstable));
+        assertTrue(cof.contained(sections, sstable));
     }
 
     @Test
@@ -101,11 +102,12 @@ public class CassandraOutgoingFileTest
     {
         List<Range<Token>> requestedRanges = Arrays.asList(new 
Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(2)));
 
+        List<SSTableReader.PartitionPositionBounds> sections = 
sstable.getPositionsForRanges(requestedRanges);
         CassandraOutgoingFile cof = new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
-                                                              
sstable.getPositionsForRanges(requestedRanges),
+                                                              sections,
                                                               requestedRanges, 
sstable.estimatedKeys());
 
-        assertFalse(cof.contained(requestedRanges, sstable));
+        assertFalse(cof.contained(sections, sstable));
     }
 
     @Test
@@ -116,11 +118,12 @@ public class CassandraOutgoingFileTest
                                                          new 
Range<>(getTokenAtIndex(5), sstable.last.getToken()));
         requestedRanges = Range.normalize(requestedRanges);
 
+        List<SSTableReader.PartitionPositionBounds> sections = 
sstable.getPositionsForRanges(requestedRanges);
         CassandraOutgoingFile cof = new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
-                                                              
sstable.getPositionsForRanges(requestedRanges),
+                                                              sections,
                                                               requestedRanges, 
sstable.estimatedKeys());
 
-        assertTrue(cof.contained(requestedRanges, sstable));
+        assertTrue(cof.contained(sections, sstable));
     }
 
     private DecoratedKey getKeyAtIndex(int i)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to