This is an automated email from the ASF dual-hosted git repository. djoshi pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new b05fe7a Optimize Zero Copy Streaming file containment check by using file sections b05fe7a is described below commit b05fe7ab010218f1fb23b3192e2aea719a9611de Author: Zhao Yang <zhaoyangsingap...@gmail.com> AuthorDate: Mon Mar 23 22:05:53 2020 +0800 Optimize Zero Copy Streaming file containment check by using file sections Patch By Zhao Yang; Reviewed by T Jake Luciani and Dinesh Joshi for CASSANDRA-15657 --- .../db/streaming/CassandraOutgoingFile.java | 56 ++++------------------ .../db/streaming/CassandraOutgoingFileTest.java | 15 +++--- 2 files changed, 19 insertions(+), 52 deletions(-) diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java index e8f5485..237c0af 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java @@ -30,14 +30,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; -import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.KeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.net.AsyncStreamingOutputPlus; @@ -47,8 +42,6 @@ import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.concurrent.Ref; -import static org.apache.cassandra.db.compaction.Verifier.RangeOwnHelper; - /** * used to transfer the part(or whole) of a SSTable data file */ @@ -65,9 +58,8 @@ public class CassandraOutgoingFile implements OutgoingStream private final CassandraStreamHeader header; private final boolean keepSSTableLevel; private final ComponentManifest manifest; - private Boolean isFullyContained; - private final List<Range<Token>> normalizedRanges; + private final boolean shouldStreamEntireSStable; public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref, List<SSTableReader.PartitionPositionBounds> sections, List<Range<Token>> normalizedRanges, @@ -78,9 +70,9 @@ public class CassandraOutgoingFile implements OutgoingStream this.ref = ref; this.estimatedKeys = estimatedKeys; this.sections = sections; - this.normalizedRanges = ImmutableList.copyOf(normalizedRanges); this.filename = ref.get().getFilename(); this.manifest = getComponentManifest(ref.get()); + this.shouldStreamEntireSStable = shouldStreamEntireSSTable(); SSTableReader sstable = ref.get(); keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD; @@ -93,7 +85,7 @@ public class CassandraOutgoingFile implements OutgoingStream .withSections(sections) .withCompressionMetadata(sstable.compression ? sstable.getCompressionMetadata() : null) .withSerializationHeader(sstable.header.toComponent()) - .isEntireSSTable(shouldStreamEntireSSTable()) + .isEntireSSTable(shouldStreamEntireSStable) .withComponentManifest(manifest) .withFirstKey(sstable.first) .withTableId(sstable.metadata().id) @@ -163,7 +155,7 @@ public class CassandraOutgoingFile implements OutgoingStream CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); - if (shouldStreamEntireSSTable() && out instanceof AsyncStreamingOutputPlus) + if (shouldStreamEntireSStable && out instanceof AsyncStreamingOutputPlus) { CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, manifest); writer.write((AsyncStreamingOutputPlus) out); @@ -185,46 +177,18 @@ public class CassandraOutgoingFile implements OutgoingStream if (!DatabaseDescriptor.streamEntireSSTables() || ref.get().getSSTableMetadata().hasLegacyCounterShards) return false; - ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(getTableId()); - - if (cfs == null) - return false; - - AbstractCompactionStrategy compactionStrategy = cfs.getCompactionStrategyManager() - .getCompactionStrategyFor(ref.get()); - - if (compactionStrategy instanceof LeveledCompactionStrategy) - return contained(normalizedRanges, ref.get()); - - return false; + return contained(sections, ref.get()); } @VisibleForTesting - public boolean contained(List<Range<Token>> normalizedRanges, SSTableReader sstable) + public boolean contained(List<SSTableReader.PartitionPositionBounds> sections, SSTableReader sstable) { - if (isFullyContained != null) - return isFullyContained; - - isFullyContained = computeContainment(normalizedRanges, sstable); - return isFullyContained; - } - - private boolean computeContainment(List<Range<Token>> normalizedRanges, SSTableReader sstable) - { - if (normalizedRanges == null) + if (sections == null || sections.isEmpty()) return false; - RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges); - try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) - { - while (iter.hasNext()) - { - DecoratedKey key = iter.next(); - if (!rangeOwnHelper.check(key)) - return false; - } - } - return true; + // if transfer sections contain entire sstable + long transferLength = sections.stream().mapToLong(p -> p.upperPosition - p.lowerPosition).sum(); + return transferLength == sstable.uncompressedLength(); } @Override diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java index 5e44346..9d663b5 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java @@ -89,11 +89,12 @@ public class CassandraOutgoingFileTest { List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), sstable.last.getToken())); + List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges); CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(), - sstable.getPositionsForRanges(requestedRanges), + sections, requestedRanges, sstable.estimatedKeys()); - assertTrue(cof.contained(requestedRanges, sstable)); + assertTrue(cof.contained(sections, sstable)); } @Test @@ -101,11 +102,12 @@ public class CassandraOutgoingFileTest { List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(2))); + List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges); CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(), - sstable.getPositionsForRanges(requestedRanges), + sections, requestedRanges, sstable.estimatedKeys()); - assertFalse(cof.contained(requestedRanges, sstable)); + assertFalse(cof.contained(sections, sstable)); } @Test @@ -116,11 +118,12 @@ public class CassandraOutgoingFileTest new Range<>(getTokenAtIndex(5), sstable.last.getToken())); requestedRanges = Range.normalize(requestedRanges); + List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges); CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(), - sstable.getPositionsForRanges(requestedRanges), + sections, requestedRanges, sstable.estimatedKeys()); - assertTrue(cof.contained(requestedRanges, sstable)); + assertTrue(cof.contained(sections, sstable)); } private DecoratedKey getKeyAtIndex(int i) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org