Repository: cassandra Updated Branches: refs/heads/trunk 95a52a8bf -> a831b99f9
Refactor Pair usage to avoid boxing ints/longs Patch by Jeff Jirsa; Reviewed by Dinesh Joshi for CASSANDRA-14260 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a831b99f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a831b99f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a831b99f Branch: refs/heads/trunk Commit: a831b99f9123d1c2bdfd70761aca3a05446c9a4c Parents: 95a52a8 Author: Jeff Jirsa <jji...@apple.com> Authored: Wed Apr 11 08:24:40 2018 -0700 Committer: Jeff Jirsa <jji...@apple.com> Committed: Wed Apr 11 08:24:40 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 4 +- .../apache/cassandra/db/ColumnFamilyStore.java | 8 +- .../org/apache/cassandra/db/Directories.java | 38 ++++++-- .../db/SnapshotDetailsTabularData.java | 6 +- .../db/repair/CassandraValidationIterator.java | 4 +- .../db/streaming/CassandraOutgoingFile.java | 5 +- .../db/streaming/CassandraStreamHeader.java | 30 +++---- .../db/streaming/CassandraStreamManager.java | 2 +- .../db/streaming/CassandraStreamReader.java | 8 +- .../db/streaming/CassandraStreamWriter.java | 17 ++-- .../CompressedCassandraStreamReader.java | 8 +- .../CompressedCassandraStreamWriter.java | 25 +++--- .../cassandra/db/streaming/CompressionInfo.java | 4 +- .../io/compress/CompressionMetadata.java | 22 ++--- .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../io/sstable/format/SSTableReader.java | 83 +++++++++++++++--- .../io/sstable/format/big/BigTableScanner.java | 3 +- .../org/apache/cassandra/net/MessageOut.java | 44 ++++++++-- .../apache/cassandra/service/StorageProxy.java | 91 ++++++++++++++++---- .../cassandra/service/StorageService.java | 2 +- .../cassandra/db/ColumnFamilyStoreTest.java | 3 +- .../apache/cassandra/db/DirectoriesTest.java | 9 +- .../cassandra/io/sstable/SSTableReaderTest.java | 17 ++-- .../io/sstable/SSTableRewriterTest.java | 11 ++- .../compression/CompressedInputStreamTest.java | 12 +-- 25 files changed, 310 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 707ea6b..2dc2021 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,7 +1,7 @@ 4.0 + * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260) * Add options to nodetool tablestats to sort and limit output (CASSANDRA-13889) - * Rename internals to reflect CQL vocabulary - (CASSANDRA-14354) + * Rename internals to reflect CQL vocabulary (CASSANDRA-14354) * Add support for hybrid MIN(), MAX() speculative retry policies (CASSANDRA-14293, CASSANDRA-14338, CASSANDRA-14352) * Fix some regressions caused by 14058 (CASSANDRA-14353) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 4c546dd..bfab6ea 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1451,9 +1451,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); for (SSTableReader sstable : sstables) { - List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(ranges); - for (Pair<Long, Long> position : positions) - expectedFileSize += position.right - position.left; + List<SSTableReader.PartitionPositionBounds> positions = sstable.getPositionsForRanges(ranges); + for (SSTableReader.PartitionPositionBounds position : positions) + expectedFileSize += position.upperPosition - position.lowerPosition; } double compressionRatio = metric.compressionRatio.getValue(); @@ -1965,7 +1965,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return Return a map of all snapshots to space being used * The pair for a snapshot has true size and size on disk. */ - public Map<String, Pair<Long,Long>> getSnapshotDetails() + public Map<String, Directories.SnapshotSizeDetails> getSnapshotDetails() { return getDirectories().getSnapshotDetails(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 2737d0c..1fc071a 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -784,19 +784,19 @@ public class Directories * @return Return a map of all snapshots to space being used * The pair for a snapshot has size on disk and true size. */ - public Map<String, Pair<Long, Long>> getSnapshotDetails() + public Map<String, SnapshotSizeDetails> getSnapshotDetails() { List<File> snapshots = listSnapshots(); - final Map<String, Pair<Long, Long>> snapshotSpaceMap = Maps.newHashMapWithExpectedSize(snapshots.size()); + final Map<String, SnapshotSizeDetails> snapshotSpaceMap = Maps.newHashMapWithExpectedSize(snapshots.size()); for (File snapshot : snapshots) { final long sizeOnDisk = FileUtils.folderSize(snapshot); final long trueSize = getTrueAllocatedSizeIn(snapshot); - Pair<Long, Long> spaceUsed = snapshotSpaceMap.get(snapshot.getName()); + SnapshotSizeDetails spaceUsed = snapshotSpaceMap.get(snapshot.getName()); if (spaceUsed == null) - spaceUsed = Pair.create(sizeOnDisk,trueSize); + spaceUsed = new SnapshotSizeDetails(sizeOnDisk,trueSize); else - spaceUsed = Pair.create(spaceUsed.left + sizeOnDisk, spaceUsed.right + trueSize); + spaceUsed = new SnapshotSizeDetails(spaceUsed.sizeOnDiskBytes + sizeOnDisk, spaceUsed.dataSizeBytes + trueSize); snapshotSpaceMap.put(snapshot.getName(), spaceUsed); } return snapshotSpaceMap; @@ -1032,4 +1032,32 @@ public class Directories && !toSkip.contains(file); } } + + public static class SnapshotSizeDetails + { + final long sizeOnDiskBytes; + final long dataSizeBytes; + + private SnapshotSizeDetails(long sizeOnDiskBytes, long dataSizeBytes) + { + this.sizeOnDiskBytes = sizeOnDiskBytes; + this.dataSizeBytes = dataSizeBytes; + } + + @Override + public final int hashCode() + { + int hashCode = (int) sizeOnDiskBytes ^ (int) (sizeOnDiskBytes >>> 32); + return 31 * (hashCode ^ (int) ((int) dataSizeBytes ^ (dataSizeBytes >>> 32))); + } + + @Override + public final boolean equals(Object o) + { + if(!(o instanceof SnapshotSizeDetails)) + return false; + SnapshotSizeDetails that = (SnapshotSizeDetails)o; + return sizeOnDiskBytes == that.sizeOnDiskBytes && dataSizeBytes == that.dataSizeBytes; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java b/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java index 97caea1..5ef729a 100644 --- a/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java +++ b/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java @@ -69,12 +69,12 @@ public class SnapshotDetailsTabularData } - public static void from(final String snapshot, final String ks, final String cf, Map.Entry<String, Pair<Long,Long>> snapshotDetail, TabularDataSupport result) + public static void from(final String snapshot, final String ks, final String cf, Map.Entry<String, Directories.SnapshotSizeDetails> snapshotDetail, TabularDataSupport result) { try { - final String totalSize = FileUtils.stringifyFileSize(snapshotDetail.getValue().left); - final String liveSize = FileUtils.stringifyFileSize(snapshotDetail.getValue().right); + final String totalSize = FileUtils.stringifyFileSize(snapshotDetail.getValue().sizeOnDiskBytes); + final String liveSize = FileUtils.stringifyFileSize(snapshotDetail.getValue().dataSizeBytes); result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES, new Object[]{ snapshot, ks, cf, liveSize, totalSize })); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java index 992a384..6fa0be2 100644 --- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java +++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java @@ -243,8 +243,8 @@ public class CassandraValidationIterator extends ValidationPartitionIterator long estimatedTotalBytes = 0; for (SSTableReader sstable : sstables) { - for (Pair<Long, Long> positionsForRanges : sstable.getPositionsForRanges(ranges)) - estimatedTotalBytes += positionsForRanges.right - positionsForRanges.left; + for (SSTableReader.PartitionPositionBounds positionsForRanges : sstable.getPositionsForRanges(ranges)) + estimatedTotalBytes += positionsForRanges.upperPosition - positionsForRanges.lowerPosition; } estimatedBytes = estimatedTotalBytes; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java index 3fd3f9d..6ec1f85 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java @@ -32,7 +32,6 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.OutgoingStream; import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Ref; /** @@ -42,12 +41,12 @@ public class CassandraOutgoingFile implements OutgoingStream { private final Ref<SSTableReader> ref; private final long estimatedKeys; - private final List<Pair<Long, Long>> sections; + private final List<SSTableReader.PartitionPositionBounds> sections; private final String filename; private final CassandraStreamHeader header; private final boolean keepSSTableLevel; - public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref, List<Pair<Long, Long>> sections, long estimatedKeys) + public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref, List<SSTableReader.PartitionPositionBounds> sections, long estimatedKeys) { Preconditions.checkNotNull(ref.get()); this.ref = ref; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java index 2603da1..43631b0 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java @@ -28,10 +28,10 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.Pair; public class CassandraStreamHeader { @@ -41,7 +41,7 @@ public class CassandraStreamHeader /** SSTable format **/ public final SSTableFormat.Type format; public final long estimatedKeys; - public final List<Pair<Long, Long>> sections; + public final List<SSTableReader.PartitionPositionBounds> sections; /** * Compression info for SSTable to send. Can be null if SSTable is not compressed. * On sender, this field is always null to avoid holding large number of Chunks. @@ -55,7 +55,7 @@ public class CassandraStreamHeader /* cached size value */ private transient final long size; - private CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<Pair<Long, Long>> sections, CompressionMetadata compressionMetadata, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header) + private CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionMetadata compressionMetadata, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header) { this.version = version; this.format = format; @@ -69,12 +69,12 @@ public class CassandraStreamHeader this.size = calculateSize(); } - public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<Pair<Long, Long>> sections, CompressionMetadata compressionMetadata, int sstableLevel, SerializationHeader.Component header) + public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionMetadata compressionMetadata, int sstableLevel, SerializationHeader.Component header) { this(version, format, estimatedKeys, sections, compressionMetadata, null, sstableLevel, header); } - public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<Pair<Long, Long>> sections, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header) + public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header) { this(version, format, estimatedKeys, sections, null, compressionInfo, sstableLevel, header); } @@ -103,8 +103,8 @@ public class CassandraStreamHeader } else { - for (Pair<Long, Long> section : sections) - transferSize += section.right - section.left; + for (SSTableReader.PartitionPositionBounds section : sections) + transferSize += section.upperPosition - section.lowerPosition; } return transferSize; } @@ -160,10 +160,10 @@ public class CassandraStreamHeader out.writeLong(header.estimatedKeys); out.writeInt(header.sections.size()); - for (Pair<Long, Long> section : header.sections) + for (SSTableReader.PartitionPositionBounds section : header.sections) { - out.writeLong(section.left); - out.writeLong(section.right); + out.writeLong(section.lowerPosition); + out.writeLong(section.upperPosition); } header.calculateCompressionInfo(); CompressionInfo.serializer.serialize(header.compressionInfo, out, version); @@ -178,9 +178,9 @@ public class CassandraStreamHeader long estimatedKeys = in.readLong(); int count = in.readInt(); - List<Pair<Long, Long>> sections = new ArrayList<>(count); + List<SSTableReader.PartitionPositionBounds> sections = new ArrayList<>(count); for (int k = 0; k < count; k++) - sections.add(Pair.create(in.readLong(), in.readLong())); + sections.add(new SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong())); CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); int sstableLevel = in.readInt(); SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); @@ -196,10 +196,10 @@ public class CassandraStreamHeader size += TypeSizes.sizeof(header.estimatedKeys); size += TypeSizes.sizeof(header.sections.size()); - for (Pair<Long, Long> section : header.sections) + for (SSTableReader.PartitionPositionBounds section : header.sections) { - size += TypeSizes.sizeof(section.left); - size += TypeSizes.sizeof(section.right); + size += TypeSizes.sizeof(section.lowerPosition); + size += TypeSizes.sizeof(section.upperPosition); } size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version); size += TypeSizes.sizeof(header.sstableLevel); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java index 466fa36..673b62c 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java @@ -146,7 +146,7 @@ public class CassandraStreamManager implements TableStreamManager for (SSTableReader sstable: refs) { Ref<SSTableReader> ref = refs.get(sstable); - List<Pair<Long, Long>> sections = sstable.getPositionsForRanges(ranges); + List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(ranges); if (sections.isEmpty()) { ref.release(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java index 26ef5ed..8e7b198 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.TrackedDataInputPlus; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; @@ -48,7 +49,6 @@ import org.apache.cassandra.streaming.messages.StreamMessageHeader; import org.apache.cassandra.streaming.messages.StreamMessage; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; /** * CassandraStreamReader reads from stream and writes to SSTable. @@ -58,7 +58,7 @@ public class CassandraStreamReader private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReader.class); protected final TableId tableId; protected final long estimatedKeys; - protected final Collection<Pair<Long, Long>> sections; + protected final Collection<SSTableReader.PartitionPositionBounds> sections; protected final StreamSession session; protected final Version inputVersion; protected final long repairedAt; @@ -162,8 +162,8 @@ public class CassandraStreamReader protected long totalSize() { long size = 0; - for (Pair<Long, Long> section : sections) - size += section.right - section.left; + for (SSTableReader.PartitionPositionBounds section : sections) + size += section.upperPosition - section.lowerPosition; return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java index b86f99a..c6dd9a9 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java @@ -38,7 +38,6 @@ import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.compress.ByteBufCompressionDataOutputStreamPlus; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; /** * CassandraStreamWriter writes given section of the SSTable to given channel. @@ -50,11 +49,11 @@ public class CassandraStreamWriter private static final Logger logger = LoggerFactory.getLogger(CassandraStreamWriter.class); protected final SSTableReader sstable; - protected final Collection<Pair<Long, Long>> sections; + protected final Collection<SSTableReader.PartitionPositionBounds> sections; protected final StreamRateLimiter limiter; protected final StreamSession session; - public CassandraStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, StreamSession session) + public CassandraStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, StreamSession session) { this.session = session; this.sstable = sstable; @@ -89,16 +88,16 @@ public class CassandraStreamWriter try (DataOutputStreamPlus compressedOutput = new ByteBufCompressionDataOutputStreamPlus(output, limiter)) { // stream each of the required sections of the file - for (Pair<Long, Long> section : sections) + for (SSTableReader.PartitionPositionBounds section : sections) { - long start = validator == null ? section.left : validator.chunkStart(section.left); + long start = validator == null ? section.lowerPosition : validator.chunkStart(section.lowerPosition); // if the transfer does not start on the valididator's chunk boundary, this is the number of bytes to offset by - int transferOffset = (int) (section.left - start); + int transferOffset = (int) (section.lowerPosition - start); if (validator != null) validator.seek(start); // length of the section to read - long length = section.right - start; + long length = section.upperPosition - start; // tracks write progress long bytesRead = 0; while (bytesRead < length) @@ -124,8 +123,8 @@ public class CassandraStreamWriter protected long totalSize() { long size = 0; - for (Pair<Long, Long> section : sections) - size += section.right - section.left; + for (SSTableReader.PartitionPositionBounds section : sections) + size += section.upperPosition - section.lowerPosition; return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java index 343d7ed..c71edfb 100644 --- a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.TrackedDataInputPlus; import org.apache.cassandra.streaming.ProgressInfo; @@ -33,7 +34,6 @@ import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.messages.StreamMessageHeader; import org.apache.cassandra.utils.ChecksumType; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; @@ -83,14 +83,14 @@ public class CompressedCassandraStreamReader extends CassandraStreamReader writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); String filename = writer.getFilename(); int sectionIdx = 0; - for (Pair<Long, Long> section : sections) + for (SSTableReader.PartitionPositionBounds section : sections) { assert cis.getTotalCompressedBytesRead() <= totalSize; - long sectionLength = section.right - section.left; + long sectionLength = section.upperPosition - section.lowerPosition; logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength); // skip to beginning of section inside chunk - cis.position(section.left); + cis.position(section.lowerPosition); in.reset(0); while (in.getBytesRead() < sectionLength) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java index 3fcbc38..c5b0c53 100644 --- a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java +++ b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java @@ -36,7 +36,6 @@ import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; /** * CassandraStreamWriter for compressed SSTable. @@ -49,7 +48,7 @@ public class CompressedCassandraStreamWriter extends CassandraStreamWriter private final CompressionInfo compressionInfo; - public CompressedCassandraStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, StreamSession session) + public CompressedCassandraStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, StreamSession session) { super(sstable, sections, session); this.compressionInfo = compressionInfo; @@ -67,15 +66,15 @@ public class CompressedCassandraStreamWriter extends CassandraStreamWriter { long progress = 0L; // calculate chunks to transfer. we want to send continuous chunks altogether. - List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks); + List<SSTableReader.PartitionPositionBounds> sections = getTransferSections(compressionInfo.chunks); int sectionIdx = 0; // stream each of the required sections of the file - for (final Pair<Long, Long> section : sections) + for (final SSTableReader.PartitionPositionBounds section : sections) { // length of the section to stream - long length = section.right - section.left; + long length = section.upperPosition - section.lowerPosition; logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length); @@ -90,7 +89,7 @@ public class CompressedCassandraStreamWriter extends CassandraStreamWriter long lastWrite; try { - lastWrite = fc.read(outBuffer, section.left + bytesTransferred); + lastWrite = fc.read(outBuffer, section.lowerPosition + bytesTransferred); assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer); outBuffer.flip(); output.writeToChannel(outBuffer); @@ -122,28 +121,28 @@ public class CompressedCassandraStreamWriter extends CassandraStreamWriter } // chunks are assumed to be sorted by offset - private List<Pair<Long, Long>> getTransferSections(CompressionMetadata.Chunk[] chunks) + private List<SSTableReader.PartitionPositionBounds> getTransferSections(CompressionMetadata.Chunk[] chunks) { - List<Pair<Long, Long>> transferSections = new ArrayList<>(); - Pair<Long, Long> lastSection = null; + List<SSTableReader.PartitionPositionBounds> transferSections = new ArrayList<>(); + SSTableReader.PartitionPositionBounds lastSection = null; for (CompressionMetadata.Chunk chunk : chunks) { if (lastSection != null) { - if (chunk.offset == lastSection.right) + if (chunk.offset == lastSection.upperPosition) { // extend previous section to end of this chunk - lastSection = Pair.create(lastSection.left, chunk.offset + chunk.length + 4); // 4 bytes for CRC + lastSection = new SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + chunk.length + 4); // 4 bytes for CRC } else { transferSections.add(lastSection); - lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4); + lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4); } } else { - lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4); + lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4); } } if (lastSection != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java index 0f0d5c7..aef57e3 100644 --- a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java +++ b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java @@ -23,10 +23,10 @@ import java.util.List; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.Pair; /** * Container that carries compression parameters and chunks to decompress data from stream. @@ -45,7 +45,7 @@ public class CompressionInfo this.parameters = parameters; } - static CompressionInfo fromCompressionMetadata(CompressionMetadata metadata, List<Pair<Long, Long>> sections) + static CompressionInfo fromCompressionMetadata(CompressionMetadata metadata, List<SSTableReader.PartitionPositionBounds> sections) { if (metadata == null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index fa0f9f7..e6ef5c3 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -49,12 +49,12 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.Memory; import org.apache.cassandra.io.util.SafeMemory; import org.apache.cassandra.schema.CompressionParams; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.SyncUtil; import org.apache.cassandra.utils.concurrent.Transactional; import org.apache.cassandra.utils.concurrent.Ref; @@ -262,15 +262,15 @@ public class CompressionMetadata * @param sections Collection of sections in uncompressed file. Should not contain sections that overlap each other. * @return Total chunk size in bytes for given sections including checksum. */ - public long getTotalSizeForSections(Collection<Pair<Long, Long>> sections) + public long getTotalSizeForSections(Collection<SSTableReader.PartitionPositionBounds> sections) { long size = 0; long lastOffset = -1; - for (Pair<Long, Long> section : sections) + for (SSTableReader.PartitionPositionBounds section : sections) { - int startIndex = (int) (section.left / parameters.chunkLength()); - int endIndex = (int) (section.right / parameters.chunkLength()); - endIndex = section.right % parameters.chunkLength() == 0 ? endIndex - 1 : endIndex; + int startIndex = (int) (section.lowerPosition / parameters.chunkLength()); + int endIndex = (int) (section.upperPosition / parameters.chunkLength()); + endIndex = section.upperPosition % parameters.chunkLength() == 0 ? endIndex - 1 : endIndex; for (int i = startIndex; i <= endIndex; i++) { long offset = i * 8L; @@ -292,7 +292,7 @@ public class CompressionMetadata * @param sections Collection of sections in uncompressed file * @return Array of chunks which corresponds to given sections of uncompressed file, sorted by chunk offset */ - public Chunk[] getChunksForSections(Collection<Pair<Long, Long>> sections) + public Chunk[] getChunksForSections(Collection<SSTableReader.PartitionPositionBounds> sections) { // use SortedSet to eliminate duplicates and sort by chunk offset SortedSet<Chunk> offsets = new TreeSet<Chunk>(new Comparator<Chunk>() @@ -302,11 +302,11 @@ public class CompressionMetadata return Longs.compare(o1.offset, o2.offset); } }); - for (Pair<Long, Long> section : sections) + for (SSTableReader.PartitionPositionBounds section : sections) { - int startIndex = (int) (section.left / parameters.chunkLength()); - int endIndex = (int) (section.right / parameters.chunkLength()); - endIndex = section.right % parameters.chunkLength() == 0 ? endIndex - 1 : endIndex; + int startIndex = (int) (section.lowerPosition / parameters.chunkLength()); + int endIndex = (int) (section.upperPosition / parameters.chunkLength()); + endIndex = section.upperPosition % parameters.chunkLength() == 0 ? endIndex - 1 : endIndex; for (int i = startIndex; i <= endIndex; i++) { long offset = i * 8L; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index a6985f7..ebc35e7 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -128,7 +128,7 @@ public class SSTableLoader implements StreamEventHandler InetAddressAndPort endpoint = entry.getKey(); Collection<Range<Token>> tokenRanges = entry.getValue(); - List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges); + List<SSTableReader.PartitionPositionBounds> sstableSections = sstable.getPositionsForRanges(tokenRanges); long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges); Ref<SSTableReader> ref = sstable.ref(); OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, estimatedKeys); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index fe1695a..dd93e5f 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -1354,9 +1354,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public long estimatedKeysForRanges(Collection<Range<Token>> ranges) { long sampleKeyCount = 0; - List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges); - for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes) - sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1); + List<IndexesBounds> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges); + for (IndexesBounds sampleIndexRange : sampleIndexes) + sampleKeyCount += (sampleIndexRange.upperPosition - sampleIndexRange.lowerPosition + 1); // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling long estimatedKeys = sampleKeyCount * ((long) Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel(); @@ -1388,10 +1388,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return indexSummary.getKey(index); } - private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges) + private static List<IndexesBounds> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges) { // use the index to determine a minimal section for each range - List<Pair<Integer,Integer>> positions = new ArrayList<>(); + List<IndexesBounds> positions = new ArrayList<>(); for (Range<Token> range : Range.normalize(ranges)) { @@ -1425,14 +1425,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS if (left > right) // empty range continue; - positions.add(Pair.create(left, right)); + positions.add(new IndexesBounds(left, right)); } return positions; } public Iterable<DecoratedKey> getKeySamples(final Range<Token> range) { - final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range)); + final List<IndexesBounds> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range)); if (indexRanges.isEmpty()) return Collections.emptyList(); @@ -1443,18 +1443,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { return new Iterator<DecoratedKey>() { - private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator(); - private Pair<Integer, Integer> current; + private Iterator<IndexesBounds> rangeIter = indexRanges.iterator(); + private IndexesBounds current; private int idx; public boolean hasNext() { - if (current == null || idx > current.right) + if (current == null || idx > current.upperPosition) { if (rangeIter.hasNext()) { current = rangeIter.next(); - idx = current.left; + idx = current.lowerPosition; return true; } return false; @@ -1482,10 +1482,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges. * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable. */ - public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges) + public List<PartitionPositionBounds> getPositionsForRanges(Collection<Range<Token>> ranges) { // use the index to determine a minimal section for each range - List<Pair<Long,Long>> positions = new ArrayList<>(); + List<PartitionPositionBounds> positions = new ArrayList<>(); for (Range<Token> range : Range.normalize(ranges)) { assert !range.isWrapAround() || range.right.isMinimum(); @@ -1507,7 +1507,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS continue; assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right); - positions.add(Pair.create(left, right)); + positions.add(new PartitionPositionBounds(left, right)); } return positions; } @@ -2362,4 +2362,59 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SerializationHeader header); } + + public static class PartitionPositionBounds + { + public final long lowerPosition; + public final long upperPosition; + + public PartitionPositionBounds(long lower, long upper) + { + this.lowerPosition = lower; + this.upperPosition = upper; + } + + @Override + public final int hashCode() + { + int hashCode = (int) lowerPosition ^ (int) (lowerPosition >>> 32); + return 31 * (hashCode ^ (int) ((int) upperPosition ^ (upperPosition >>> 32))); + } + + @Override + public final boolean equals(Object o) + { + if(!(o instanceof PartitionPositionBounds)) + return false; + PartitionPositionBounds that = (PartitionPositionBounds)o; + return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition; + } + } + + public static class IndexesBounds + { + public final int lowerPosition; + public final int upperPosition; + + public IndexesBounds(int lower, int upper) + { + this.lowerPosition = lower; + this.upperPosition = upper; + } + + @Override + public final int hashCode() + { + return 31 * lowerPosition * upperPosition; + } + + @Override + public final boolean equals(Object o) + { + if (!(o instanceof IndexesBounds)) + return false; + IndexesBounds that = (IndexesBounds) o; + return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index b01573c..cb25c6b 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@ -42,7 +42,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReadsListener; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.dht.AbstractBounds.isEmpty; import static org.apache.cassandra.dht.AbstractBounds.maxLeft; @@ -84,7 +83,7 @@ public class BigTableScanner implements ISSTableScanner public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges) { // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) - List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(tokenRanges); + List<SSTableReader.PartitionPositionBounds> positions = sstable.getPositionsForRanges(tokenRanges); if (positions.isEmpty()) return new EmptySSTableScanner(sstable); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index 236a770..30968df 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -215,7 +215,7 @@ public class MessageOut<T> } } - private Pair<Long, Long> calculateSerializedSize(int version) + private MessageOutSizes calculateSerializedSize(int version) { long size = 0; size += CompactEndpointSerializationHelper.instance.serializedSize(from, version); @@ -236,7 +236,7 @@ public class MessageOut<T> assert payloadSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages size += TypeSizes.sizeof((int) payloadSize); size += payloadSize; - return Pair.create(size, payloadSize); + return new MessageOutSizes(size, payloadSize); } /** @@ -258,18 +258,18 @@ public class MessageOut<T> if (serializedSize > 0 && serializedSizeVersion == version) return serializedSize; - Pair<Long, Long> sizes = calculateSerializedSize(version); - if (sizes.left > Integer.MAX_VALUE) - throw new IllegalStateException("message size exceeds maximum allowed size: size = " + sizes.left); + MessageOutSizes sizes = calculateSerializedSize(version); + if (sizes.messageSize > Integer.MAX_VALUE) + throw new IllegalStateException("message size exceeds maximum allowed size: size = " + sizes.messageSize); if (serializedSizeVersion == SERIALIZED_SIZE_VERSION_UNDEFINED) { - serializedSize = sizes.left.intValue(); - payloadSerializedSize = sizes.right.intValue(); + serializedSize = Ints.checkedCast(sizes.messageSize); + payloadSerializedSize = Ints.checkedCast(sizes.payloadSize); serializedSizeVersion = version; } - return sizes.left.intValue(); + return Ints.checkedCast(sizes.messageSize); } public Object getParameter(ParameterType type) @@ -283,4 +283,32 @@ public class MessageOut<T> } return null; } + + private static class MessageOutSizes + { + public final long messageSize; + public final long payloadSize; + + private MessageOutSizes(long messageSize, long payloadSize) + { + this.messageSize = messageSize; + this.payloadSize = payloadSize; + } + + @Override + public final int hashCode() + { + int hashCode = (int) messageSize ^ (int) (messageSize >>> 32); + return 31 * (hashCode ^ (int) ((int) payloadSize ^ (payloadSize >>> 32))); + } + + @Override + public final boolean equals(Object o) + { + if (!(o instanceof MessageOutSizes)) + return false; + MessageOutSizes that = (MessageOutSizes) o; + return messageSize == that.messageSize && payloadSize == that.payloadSize; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index bacc3a8..7f0837b 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -250,13 +250,13 @@ public class StorageProxy implements StorageProxyMBean while (System.nanoTime() - queryStartNanoTime < timeout) { // for simplicity, we'll do a single liveness check at the start of each attempt - Pair<List<InetAddressAndPort>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos); - List<InetAddressAndPort> liveEndpoints = p.left; - int requiredParticipants = p.right; + PaxosParticipants p = getPaxosParticipants(metadata, key, consistencyForPaxos); + List<InetAddressAndPort> liveEndpoints = p.liveEndpoints; + int requiredParticipants = p.participants; - final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); - final UUID ballot = pair.left; - contentions += pair.right; + final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); + final UUID ballot = pair.ballot; + contentions += pair.contentions; // read the current values and check they validate the conditions Tracing.trace("Reading existing values for CAS precondition"); @@ -356,7 +356,7 @@ public class StorageProxy implements StorageProxyMBean }; } - private static Pair<List<InetAddressAndPort>, Integer> getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException + private static PaxosParticipants getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException { Token tk = key.getToken(); List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk); @@ -384,7 +384,7 @@ public class StorageProxy implements StorageProxyMBean participants + 1, liveEndpoints.size()); - return Pair.create(liveEndpoints, requiredParticipants); + return new PaxosParticipants(liveEndpoints, requiredParticipants); } /** @@ -393,7 +393,7 @@ public class StorageProxy implements StorageProxyMBean * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of * nodes have seen the mostRecentCommit. Otherwise, return null. */ - private static Pair<UUID, Integer> beginAndRepairPaxos(long queryStartNanoTime, + private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime, DecoratedKey key, TableMetadata metadata, List<InetAddressAndPort> liveEndpoints, @@ -486,7 +486,7 @@ public class StorageProxy implements StorageProxyMBean continue; } - return Pair.create(ballot, contentions); + return new PaxosBallotAndContention(ballot, contentions); } recordCasContention(contentions); @@ -1662,9 +1662,9 @@ public class StorageProxy implements StorageProxyMBean try { // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read - Pair<List<InetAddressAndPort>, Integer> p = getPaxosParticipants(metadata, key, consistencyLevel); - List<InetAddressAndPort> liveEndpoints = p.left; - int requiredParticipants = p.right; + PaxosParticipants p = getPaxosParticipants(metadata, key, consistencyLevel); + List<InetAddressAndPort> liveEndpoints = p.liveEndpoints; + int requiredParticipants = p.participants; // does the work of applying in-progress writes; throws UAE or timeout if it can't final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL @@ -1673,9 +1673,9 @@ public class StorageProxy implements StorageProxyMBean try { - final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state); - if (pair.right > 0) - casReadMetrics.contention.update(pair.right); + final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state); + if (pair.contentions > 0) + casReadMetrics.contention.update(pair.contentions); } catch (WriteTimeoutException e) { @@ -2829,4 +2829,63 @@ public class StorageProxy implements StorageProxyMBean public void setOtcBacklogExpirationInterval(int intervalInMillis) { DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis); } + + + static class PaxosParticipants + { + final List<InetAddressAndPort> liveEndpoints; + final int participants; + + PaxosParticipants(List<InetAddressAndPort> liveEndpoints, int participants) + { + this.liveEndpoints = liveEndpoints; + this.participants = participants; + } + + @Override + public final int hashCode() + { + int hashCode = 31 + (liveEndpoints == null ? 0 : liveEndpoints.hashCode()); + return 31 * hashCode * this.participants; + } + + @Override + public final boolean equals(Object o) + { + if(!(o instanceof PaxosParticipants)) + return false; + PaxosParticipants that = (PaxosParticipants)o; + // handles nulls properly + return Objects.equals(liveEndpoints, that.liveEndpoints) && participants == that.participants; + } + } + + static class PaxosBallotAndContention + { + final UUID ballot; + final int contentions; + + PaxosBallotAndContention(UUID ballot, int contentions) + { + this.ballot = ballot; + this.contentions = contentions; + } + + @Override + public final int hashCode() + { + int hashCode = 31 + (ballot == null ? 0 : ballot.hashCode()); + return 31 * hashCode * this.contentions; + } + + @Override + public final boolean equals(Object o) + { + if(!(o instanceof PaxosBallotAndContention)) + return false; + PaxosBallotAndContention that = (PaxosBallotAndContention)o; + // handles nulls properly + return Objects.equals(ballot, that.ballot) && contentions == that.contentions; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 8e73439..91206c1 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3448,7 +3448,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE for (ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores()) { - for (Map.Entry<String, Pair<Long,Long>> snapshotDetail : cfStore.getSnapshotDetails().entrySet()) + for (Map.Entry<String, Directories.SnapshotSizeDetails> snapshotDetail : cfStore.getSnapshotDetails().entrySet()) { TabularDataSupport data = (TabularDataSupport)snapshotMap.get(snapshotDetail.getKey()); if (data == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index b553a12..888cdc6 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -49,7 +49,6 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; import static junit.framework.Assert.assertNotNull; @RunWith(OrderedJUnit4ClassRunner.class) @@ -240,7 +239,7 @@ public class ColumnFamilyStoreTest cfs.snapshot("nonEphemeralSnapshot", null, false, false); cfs.snapshot("ephemeralSnapshot", null, true, false); - Map<String, Pair<Long, Long>> snapshotDetails = cfs.getSnapshotDetails(); + Map<String, Directories.SnapshotSizeDetails> snapshotDetails = cfs.getSnapshotDetails(); assertEquals(2, snapshotDetails.size()); assertTrue(snapshotDetails.containsKey("ephemeralSnapshot")); assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/test/unit/org/apache/cassandra/db/DirectoriesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java index ffd4d3e..bbbb4e6 100644 --- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java +++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java @@ -46,7 +46,6 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.service.DefaultFSErrorHandler; -import org.apache.cassandra.utils.Pair; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -216,13 +215,13 @@ public class DirectoriesTest assertEquals(40, indexDirectories.trueSnapshotsSize()); // check snapshot details - Map<String, Pair<Long, Long>> parentSnapshotDetail = parentDirectories.getSnapshotDetails(); + Map<String, Directories.SnapshotSizeDetails> parentSnapshotDetail = parentDirectories.getSnapshotDetails(); assertTrue(parentSnapshotDetail.containsKey("test")); - assertEquals(30L, parentSnapshotDetail.get("test").right.longValue()); + assertEquals(30L, parentSnapshotDetail.get("test").dataSizeBytes); - Map<String, Pair<Long, Long>> indexSnapshotDetail = indexDirectories.getSnapshotDetails(); + Map<String, Directories.SnapshotSizeDetails> indexSnapshotDetail = indexDirectories.getSnapshotDetails(); assertTrue(indexSnapshotDetail.containsKey("test")); - assertEquals(40L, indexSnapshotDetail.get("test").right.longValue()); + assertEquals(40L, indexSnapshotDetail.get("test").dataSizeBytes); // check backup directory File parentBackupDirectory = Directories.getBackupsDirectory(parentDesc); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 0ebf0a2..6f94696 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -55,7 +55,6 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FilterFactory; -import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.junit.Assert.assertEquals; @@ -127,11 +126,11 @@ public class SSTableReaderTest // confirm that positions increase continuously SSTableReader sstable = store.getLiveSSTables().iterator().next(); long previous = -1; - for (Pair<Long,Long> section : sstable.getPositionsForRanges(ranges)) + for (SSTableReader.PartitionPositionBounds section : sstable.getPositionsForRanges(ranges)) { - assert previous <= section.left : previous + " ! < " + section.left; - assert section.left < section.right : section.left + " ! < " + section.right; - previous = section.right; + assert previous <= section.lowerPosition : previous + " ! < " + section.lowerPosition; + assert section.lowerPosition < section.upperPosition : section.lowerPosition + " ! < " + section.upperPosition; + previous = section.upperPosition; } } @@ -270,13 +269,13 @@ public class SSTableReaderTest long p6 = sstable.getPosition(k(6), SSTableReader.Operator.EQ).position; long p7 = sstable.getPosition(k(7), SSTableReader.Operator.EQ).position; - Pair<Long, Long> p = sstable.getPositionsForRanges(makeRanges(t(2), t(6))).get(0); + SSTableReader.PartitionPositionBounds p = sstable.getPositionsForRanges(makeRanges(t(2), t(6))).get(0); // range are start exclusive so we should start at 3 - assert p.left == p3; + assert p.lowerPosition == p3; // to capture 6 we have to stop at the start of 7 - assert p.right == p7; + assert p.upperPosition == p7; } @Test @@ -537,7 +536,7 @@ public class SSTableReaderTest ranges.add(new Range<Token>(t(98), t(99))); SSTableReader sstable = store.getLiveSSTables().iterator().next(); - List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges); + List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(ranges); assert sections.size() == 1 : "Expected to find range in sstable" ; // re-open the same sstable as it would be during bulk loading http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 5c3e8c9..6412ef4 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -60,7 +60,6 @@ import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; import static org.junit.Assert.*; @@ -172,14 +171,14 @@ public class SSTableRewriterTest extends SSTableWriterTestBase { SSTableReader c = txn.current(sstables.iterator().next()); Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.getPartitioner().getMinimumToken(), cfs.getPartitioner().getMinimumToken())); - List<Pair<Long, Long>> tmplinkPositions = sstable.getPositionsForRanges(r); - List<Pair<Long, Long>> compactingPositions = c.getPositionsForRanges(r); + List<SSTableReader.PartitionPositionBounds> tmplinkPositions = sstable.getPositionsForRanges(r); + List<SSTableReader.PartitionPositionBounds> compactingPositions = c.getPositionsForRanges(r); assertEquals(1, tmplinkPositions.size()); assertEquals(1, compactingPositions.size()); - assertEquals(0, tmplinkPositions.get(0).left.longValue()); + assertEquals(0, tmplinkPositions.get(0).lowerPosition); // make sure we have no overlap between the early opened file and the compacting one: - assertEquals(tmplinkPositions.get(0).right.longValue(), compactingPositions.get(0).left.longValue()); - assertEquals(c.uncompressedLength(), compactingPositions.get(0).right.longValue()); + assertEquals(tmplinkPositions.get(0).upperPosition, compactingPositions.get(0).lowerPosition); + assertEquals(c.uncompressedLength(), compactingPositions.get(0).upperPosition); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java index 60d8b6c..dbca906 100644 --- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java @@ -28,6 +28,7 @@ import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.SequentialWriterOption; import org.apache.cassandra.schema.CompressionParams; @@ -37,7 +38,6 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.db.streaming.CompressedInputStream; import org.apache.cassandra.db.streaming.CompressionInfo; import org.apache.cassandra.utils.ChecksumType; -import org.apache.cassandra.utils.Pair; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -129,11 +129,11 @@ public class CompressedInputStreamTest } CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath()); - List<Pair<Long, Long>> sections = new ArrayList<>(); + List<SSTableReader.PartitionPositionBounds> sections = new ArrayList<>(); for (long l : valuesToCheck) { long position = index.get(l); - sections.add(Pair.create(position, position + 8)); + sections.add(new SSTableReader.PartitionPositionBounds(position, position + 8)); } CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections); long totalSize = comp.getTotalSizeForSections(sections); @@ -179,14 +179,14 @@ public class CompressedInputStreamTest { for (int i = 0; i < sections.size(); i++) { - input.position(sections.get(i).left); + input.position(sections.get(i).lowerPosition); long readValue = in.readLong(); assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue); } } } - private static void testException(List<Pair<Long, Long>> sections, CompressionInfo info) throws IOException + private static void testException(List<SSTableReader.PartitionPositionBounds> sections, CompressionInfo info) throws IOException { CompressedInputStream input = new CompressedInputStream(new DataInputStreamPlus(new ByteArrayInputStream(new byte[0])), info, ChecksumType.CRC32, () -> 1.0); @@ -195,7 +195,7 @@ public class CompressedInputStreamTest for (int i = 0; i < sections.size(); i++) { try { - input.position(sections.get(i).left); + input.position(sections.get(i).lowerPosition); in.readLong(); fail("Should have thrown IOException"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org