Repository: cassandra
Updated Branches:
  refs/heads/trunk 95a52a8bf -> a831b99f9


Refactor Pair usage to avoid boxing ints/longs

Patch by Jeff Jirsa; Reviewed by Dinesh Joshi for CASSANDRA-14260


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a831b99f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a831b99f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a831b99f

Branch: refs/heads/trunk
Commit: a831b99f9123d1c2bdfd70761aca3a05446c9a4c
Parents: 95a52a8
Author: Jeff Jirsa <jji...@apple.com>
Authored: Wed Apr 11 08:24:40 2018 -0700
Committer: Jeff Jirsa <jji...@apple.com>
Committed: Wed Apr 11 08:24:40 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  8 +-
 .../org/apache/cassandra/db/Directories.java    | 38 ++++++--
 .../db/SnapshotDetailsTabularData.java          |  6 +-
 .../db/repair/CassandraValidationIterator.java  |  4 +-
 .../db/streaming/CassandraOutgoingFile.java     |  5 +-
 .../db/streaming/CassandraStreamHeader.java     | 30 +++----
 .../db/streaming/CassandraStreamManager.java    |  2 +-
 .../db/streaming/CassandraStreamReader.java     |  8 +-
 .../db/streaming/CassandraStreamWriter.java     | 17 ++--
 .../CompressedCassandraStreamReader.java        |  8 +-
 .../CompressedCassandraStreamWriter.java        | 25 +++---
 .../cassandra/db/streaming/CompressionInfo.java |  4 +-
 .../io/compress/CompressionMetadata.java        | 22 ++---
 .../cassandra/io/sstable/SSTableLoader.java     |  2 +-
 .../io/sstable/format/SSTableReader.java        | 83 +++++++++++++++---
 .../io/sstable/format/big/BigTableScanner.java  |  3 +-
 .../org/apache/cassandra/net/MessageOut.java    | 44 ++++++++--
 .../apache/cassandra/service/StorageProxy.java  | 91 ++++++++++++++++----
 .../cassandra/service/StorageService.java       |  2 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |  3 +-
 .../apache/cassandra/db/DirectoriesTest.java    |  9 +-
 .../cassandra/io/sstable/SSTableReaderTest.java | 17 ++--
 .../io/sstable/SSTableRewriterTest.java         | 11 ++-
 .../compression/CompressedInputStreamTest.java  | 12 +--
 25 files changed, 310 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 707ea6b..2dc2021 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,7 @@
 4.0
+ * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260)
  * Add options to nodetool tablestats to sort and limit output 
(CASSANDRA-13889)
- * Rename internals to reflect CQL vocabulary
-   (CASSANDRA-14354)
+ * Rename internals to reflect CQL vocabulary (CASSANDRA-14354)
  * Add support for hybrid MIN(), MAX() speculative retry policies
    (CASSANDRA-14293, CASSANDRA-14338, CASSANDRA-14352)
  * Fix some regressions caused by 14058 (CASSANDRA-14353)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4c546dd..bfab6ea 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1451,9 +1451,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(keyspace.getName());
         for (SSTableReader sstable : sstables)
         {
-            List<Pair<Long, Long>> positions = 
sstable.getPositionsForRanges(ranges);
-            for (Pair<Long, Long> position : positions)
-                expectedFileSize += position.right - position.left;
+            List<SSTableReader.PartitionPositionBounds> positions = 
sstable.getPositionsForRanges(ranges);
+            for (SSTableReader.PartitionPositionBounds position : positions)
+                expectedFileSize += position.upperPosition - 
position.lowerPosition;
         }
 
         double compressionRatio = metric.compressionRatio.getValue();
@@ -1965,7 +1965,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * @return  Return a map of all snapshots to space being used
      * The pair for a snapshot has true size and size on disk.
      */
-    public Map<String, Pair<Long,Long>> getSnapshotDetails()
+    public Map<String, Directories.SnapshotSizeDetails> getSnapshotDetails()
     {
         return getDirectories().getSnapshotDetails();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index 2737d0c..1fc071a 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -784,19 +784,19 @@ public class Directories
      * @return  Return a map of all snapshots to space being used
      * The pair for a snapshot has size on disk and true size.
      */
-    public Map<String, Pair<Long, Long>> getSnapshotDetails()
+    public Map<String, SnapshotSizeDetails> getSnapshotDetails()
     {
         List<File> snapshots = listSnapshots();
-        final Map<String, Pair<Long, Long>> snapshotSpaceMap = 
Maps.newHashMapWithExpectedSize(snapshots.size());
+        final Map<String, SnapshotSizeDetails> snapshotSpaceMap = 
Maps.newHashMapWithExpectedSize(snapshots.size());
         for (File snapshot : snapshots)
         {
             final long sizeOnDisk = FileUtils.folderSize(snapshot);
             final long trueSize = getTrueAllocatedSizeIn(snapshot);
-            Pair<Long, Long> spaceUsed = 
snapshotSpaceMap.get(snapshot.getName());
+            SnapshotSizeDetails spaceUsed = 
snapshotSpaceMap.get(snapshot.getName());
             if (spaceUsed == null)
-                spaceUsed =  Pair.create(sizeOnDisk,trueSize);
+                spaceUsed =  new SnapshotSizeDetails(sizeOnDisk,trueSize);
             else
-                spaceUsed = Pair.create(spaceUsed.left + sizeOnDisk, 
spaceUsed.right + trueSize);
+                spaceUsed = new SnapshotSizeDetails(spaceUsed.sizeOnDiskBytes 
+ sizeOnDisk, spaceUsed.dataSizeBytes + trueSize);
             snapshotSpaceMap.put(snapshot.getName(), spaceUsed);
         }
         return snapshotSpaceMap;
@@ -1032,4 +1032,32 @@ public class Directories
                 && !toSkip.contains(file);
         }
     }
+
+    public static class SnapshotSizeDetails
+    {
+        final long sizeOnDiskBytes;
+        final long dataSizeBytes;
+
+        private SnapshotSizeDetails(long sizeOnDiskBytes, long dataSizeBytes)
+        {
+            this.sizeOnDiskBytes = sizeOnDiskBytes;
+            this.dataSizeBytes = dataSizeBytes;
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            int hashCode = (int) sizeOnDiskBytes ^ (int) (sizeOnDiskBytes >>> 
32);
+            return 31 * (hashCode ^ (int) ((int) dataSizeBytes ^ 
(dataSizeBytes >>> 32)));
+        }
+
+        @Override
+        public final boolean equals(Object o)
+        {
+            if(!(o instanceof SnapshotSizeDetails))
+                return false;
+            SnapshotSizeDetails that = (SnapshotSizeDetails)o;
+            return sizeOnDiskBytes == that.sizeOnDiskBytes && dataSizeBytes == 
that.dataSizeBytes;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java 
b/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java
index 97caea1..5ef729a 100644
--- a/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java
+++ b/src/java/org/apache/cassandra/db/SnapshotDetailsTabularData.java
@@ -69,12 +69,12 @@ public class SnapshotDetailsTabularData
     }
 
 
-    public static void from(final String snapshot, final String ks, final 
String cf, Map.Entry<String, Pair<Long,Long>> snapshotDetail, 
TabularDataSupport result)
+    public static void from(final String snapshot, final String ks, final 
String cf, Map.Entry<String, Directories.SnapshotSizeDetails> snapshotDetail, 
TabularDataSupport result)
     {
         try
         {
-            final String totalSize = 
FileUtils.stringifyFileSize(snapshotDetail.getValue().left);
-            final String liveSize =  
FileUtils.stringifyFileSize(snapshotDetail.getValue().right);
+            final String totalSize = 
FileUtils.stringifyFileSize(snapshotDetail.getValue().sizeOnDiskBytes);
+            final String liveSize =  
FileUtils.stringifyFileSize(snapshotDetail.getValue().dataSizeBytes);
             result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES,
                     new Object[]{ snapshot, ks, cf, liveSize, totalSize }));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java 
b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index 992a384..6fa0be2 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -243,8 +243,8 @@ public class CassandraValidationIterator extends 
ValidationPartitionIterator
         long estimatedTotalBytes = 0;
         for (SSTableReader sstable : sstables)
         {
-            for (Pair<Long, Long> positionsForRanges : 
sstable.getPositionsForRanges(ranges))
-                estimatedTotalBytes += positionsForRanges.right - 
positionsForRanges.left;
+            for (SSTableReader.PartitionPositionBounds positionsForRanges : 
sstable.getPositionsForRanges(ranges))
+                estimatedTotalBytes += positionsForRanges.upperPosition - 
positionsForRanges.lowerPosition;
         }
         estimatedBytes = estimatedTotalBytes;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 3fd3f9d..6ec1f85 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.OutgoingStream;
 import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Ref;
 
 /**
@@ -42,12 +41,12 @@ public class CassandraOutgoingFile implements OutgoingStream
 {
     private final Ref<SSTableReader> ref;
     private final long estimatedKeys;
-    private final List<Pair<Long, Long>> sections;
+    private final List<SSTableReader.PartitionPositionBounds> sections;
     private final String filename;
     private final CassandraStreamHeader header;
     private final boolean keepSSTableLevel;
 
-    public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> 
ref, List<Pair<Long, Long>> sections, long estimatedKeys)
+    public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> 
ref, List<SSTableReader.PartitionPositionBounds> sections, long estimatedKeys)
     {
         Preconditions.checkNotNull(ref.get());
         this.ref = ref;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
index 2603da1..43631b0 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
@@ -28,10 +28,10 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.Pair;
 
 public class CassandraStreamHeader
 {
@@ -41,7 +41,7 @@ public class CassandraStreamHeader
     /** SSTable format **/
     public final SSTableFormat.Type format;
     public final long estimatedKeys;
-    public final List<Pair<Long, Long>> sections;
+    public final List<SSTableReader.PartitionPositionBounds> sections;
     /**
      * Compression info for SSTable to send. Can be null if SSTable is not 
compressed.
      * On sender, this field is always null to avoid holding large number of 
Chunks.
@@ -55,7 +55,7 @@ public class CassandraStreamHeader
     /* cached size value */
     private transient final long size;
 
-    private CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<Pair<Long, Long>> sections, CompressionMetadata 
compressionMetadata, CompressionInfo compressionInfo, int sstableLevel, 
SerializationHeader.Component header)
+    private CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, 
CompressionMetadata compressionMetadata, CompressionInfo compressionInfo, int 
sstableLevel, SerializationHeader.Component header)
     {
         this.version = version;
         this.format = format;
@@ -69,12 +69,12 @@ public class CassandraStreamHeader
         this.size = calculateSize();
     }
 
-    public CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<Pair<Long, Long>> sections, CompressionMetadata 
compressionMetadata, int sstableLevel, SerializationHeader.Component header)
+    public CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, 
CompressionMetadata compressionMetadata, int sstableLevel, 
SerializationHeader.Component header)
     {
         this(version, format, estimatedKeys, sections, compressionMetadata, 
null, sstableLevel, header);
     }
 
-    public CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<Pair<Long, Long>> sections, CompressionInfo 
compressionInfo, int sstableLevel, SerializationHeader.Component header)
+    public CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, 
CompressionInfo compressionInfo, int sstableLevel, 
SerializationHeader.Component header)
     {
         this(version, format, estimatedKeys, sections, null, compressionInfo, 
sstableLevel, header);
     }
@@ -103,8 +103,8 @@ public class CassandraStreamHeader
         }
         else
         {
-            for (Pair<Long, Long> section : sections)
-                transferSize += section.right - section.left;
+            for (SSTableReader.PartitionPositionBounds section : sections)
+                transferSize += section.upperPosition - section.lowerPosition;
         }
         return transferSize;
     }
@@ -160,10 +160,10 @@ public class CassandraStreamHeader
 
             out.writeLong(header.estimatedKeys);
             out.writeInt(header.sections.size());
-            for (Pair<Long, Long> section : header.sections)
+            for (SSTableReader.PartitionPositionBounds section : 
header.sections)
             {
-                out.writeLong(section.left);
-                out.writeLong(section.right);
+                out.writeLong(section.lowerPosition);
+                out.writeLong(section.upperPosition);
             }
             header.calculateCompressionInfo();
             CompressionInfo.serializer.serialize(header.compressionInfo, out, 
version);
@@ -178,9 +178,9 @@ public class CassandraStreamHeader
 
             long estimatedKeys = in.readLong();
             int count = in.readInt();
-            List<Pair<Long, Long>> sections = new ArrayList<>(count);
+            List<SSTableReader.PartitionPositionBounds> sections = new 
ArrayList<>(count);
             for (int k = 0; k < count; k++)
-                sections.add(Pair.create(in.readLong(), in.readLong()));
+                sections.add(new 
SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong()));
             CompressionInfo compressionInfo = 
CompressionInfo.serializer.deserialize(in, version);
             int sstableLevel = in.readInt();
             SerializationHeader.Component header =  
SerializationHeader.serializer.deserialize(sstableVersion, in);
@@ -196,10 +196,10 @@ public class CassandraStreamHeader
             size += TypeSizes.sizeof(header.estimatedKeys);
 
             size += TypeSizes.sizeof(header.sections.size());
-            for (Pair<Long, Long> section : header.sections)
+            for (SSTableReader.PartitionPositionBounds section : 
header.sections)
             {
-                size += TypeSizes.sizeof(section.left);
-                size += TypeSizes.sizeof(section.right);
+                size += TypeSizes.sizeof(section.lowerPosition);
+                size += TypeSizes.sizeof(section.upperPosition);
             }
             size += 
CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
             size += TypeSizes.sizeof(header.sstableLevel);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
index 466fa36..673b62c 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
@@ -146,7 +146,7 @@ public class CassandraStreamManager implements 
TableStreamManager
             for (SSTableReader sstable: refs)
             {
                 Ref<SSTableReader> ref = refs.get(sstable);
-                List<Pair<Long, Long>> sections = 
sstable.getPositionsForRanges(ranges);
+                List<SSTableReader.PartitionPositionBounds> sections = 
sstable.getPositionsForRanges(ranges);
                 if (sections.isEmpty())
                 {
                     ref.release();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 26ef5ed..8e7b198 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.TrackedDataInputPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -48,7 +49,6 @@ import 
org.apache.cassandra.streaming.messages.StreamMessageHeader;
 import org.apache.cassandra.streaming.messages.StreamMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * CassandraStreamReader reads from stream and writes to SSTable.
@@ -58,7 +58,7 @@ public class CassandraStreamReader
     private static final Logger logger = 
LoggerFactory.getLogger(CassandraStreamReader.class);
     protected final TableId tableId;
     protected final long estimatedKeys;
-    protected final Collection<Pair<Long, Long>> sections;
+    protected final Collection<SSTableReader.PartitionPositionBounds> sections;
     protected final StreamSession session;
     protected final Version inputVersion;
     protected final long repairedAt;
@@ -162,8 +162,8 @@ public class CassandraStreamReader
     protected long totalSize()
     {
         long size = 0;
-        for (Pair<Long, Long> section : sections)
-            size += section.right - section.left;
+        for (SSTableReader.PartitionPositionBounds section : sections)
+            size += section.upperPosition - section.lowerPosition;
         return size;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
index b86f99a..c6dd9a9 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
@@ -38,7 +38,6 @@ import 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
 import org.apache.cassandra.streaming.StreamSession;
 import 
org.apache.cassandra.streaming.compress.ByteBufCompressionDataOutputStreamPlus;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * CassandraStreamWriter writes given section of the SSTable to given channel.
@@ -50,11 +49,11 @@ public class CassandraStreamWriter
     private static final Logger logger = 
LoggerFactory.getLogger(CassandraStreamWriter.class);
 
     protected final SSTableReader sstable;
-    protected final Collection<Pair<Long, Long>> sections;
+    protected final Collection<SSTableReader.PartitionPositionBounds> sections;
     protected final StreamRateLimiter limiter;
     protected final StreamSession session;
 
-    public CassandraStreamWriter(SSTableReader sstable, Collection<Pair<Long, 
Long>> sections, StreamSession session)
+    public CassandraStreamWriter(SSTableReader sstable, 
Collection<SSTableReader.PartitionPositionBounds> sections, StreamSession 
session)
     {
         this.session = session;
         this.sstable = sstable;
@@ -89,16 +88,16 @@ public class CassandraStreamWriter
             try (DataOutputStreamPlus compressedOutput = new 
ByteBufCompressionDataOutputStreamPlus(output, limiter))
             {
                 // stream each of the required sections of the file
-                for (Pair<Long, Long> section : sections)
+                for (SSTableReader.PartitionPositionBounds section : sections)
                 {
-                    long start = validator == null ? section.left : 
validator.chunkStart(section.left);
+                    long start = validator == null ? section.lowerPosition : 
validator.chunkStart(section.lowerPosition);
                     // if the transfer does not start on the valididator's 
chunk boundary, this is the number of bytes to offset by
-                    int transferOffset = (int) (section.left - start);
+                    int transferOffset = (int) (section.lowerPosition - start);
                     if (validator != null)
                         validator.seek(start);
 
                     // length of the section to read
-                    long length = section.right - start;
+                    long length = section.upperPosition - start;
                     // tracks write progress
                     long bytesRead = 0;
                     while (bytesRead < length)
@@ -124,8 +123,8 @@ public class CassandraStreamWriter
     protected long totalSize()
     {
         long size = 0;
-        for (Pair<Long, Long> section : sections)
-            size += section.right - section.left;
+        for (SSTableReader.PartitionPositionBounds section : sections)
+            size += section.upperPosition - section.lowerPosition;
         return size;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
index 343d7ed..c71edfb 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.TrackedDataInputPlus;
 import org.apache.cassandra.streaming.ProgressInfo;
@@ -33,7 +34,6 @@ import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.StreamMessageHeader;
 import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
 
@@ -83,14 +83,14 @@ public class CompressedCassandraStreamReader extends 
CassandraStreamReader
             writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
             String filename = writer.getFilename();
             int sectionIdx = 0;
-            for (Pair<Long, Long> section : sections)
+            for (SSTableReader.PartitionPositionBounds section : sections)
             {
                 assert cis.getTotalCompressedBytesRead() <= totalSize;
-                long sectionLength = section.right - section.left;
+                long sectionLength = section.upperPosition - 
section.lowerPosition;
 
                 logger.trace("[Stream #{}] Reading section {} with length {} 
from stream.", session.planId(), sectionIdx++, sectionLength);
                 // skip to beginning of section inside chunk
-                cis.position(section.left);
+                cis.position(section.lowerPosition);
                 in.reset(0);
 
                 while (in.getBytesRead() < sectionLength)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
 
b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
index 3fcbc38..c5b0c53 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
@@ -36,7 +36,6 @@ import 
org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * CassandraStreamWriter for compressed SSTable.
@@ -49,7 +48,7 @@ public class CompressedCassandraStreamWriter extends 
CassandraStreamWriter
 
     private final CompressionInfo compressionInfo;
 
-    public CompressedCassandraStreamWriter(SSTableReader sstable, 
Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, 
StreamSession session)
+    public CompressedCassandraStreamWriter(SSTableReader sstable, 
Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo 
compressionInfo, StreamSession session)
     {
         super(sstable, sections, session);
         this.compressionInfo = compressionInfo;
@@ -67,15 +66,15 @@ public class CompressedCassandraStreamWriter extends 
CassandraStreamWriter
         {
             long progress = 0L;
             // calculate chunks to transfer. we want to send continuous chunks 
altogether.
-            List<Pair<Long, Long>> sections = 
getTransferSections(compressionInfo.chunks);
+            List<SSTableReader.PartitionPositionBounds> sections = 
getTransferSections(compressionInfo.chunks);
 
             int sectionIdx = 0;
 
             // stream each of the required sections of the file
-            for (final Pair<Long, Long> section : sections)
+            for (final SSTableReader.PartitionPositionBounds section : 
sections)
             {
                 // length of the section to stream
-                long length = section.right - section.left;
+                long length = section.upperPosition - section.lowerPosition;
 
                 logger.trace("[Stream #{}] Writing section {} with length {} 
to stream.", session.planId(), sectionIdx++, length);
 
@@ -90,7 +89,7 @@ public class CompressedCassandraStreamWriter extends 
CassandraStreamWriter
                     long lastWrite;
                     try
                     {
-                        lastWrite = fc.read(outBuffer, section.left + 
bytesTransferred);
+                        lastWrite = fc.read(outBuffer, section.lowerPosition + 
bytesTransferred);
                         assert lastWrite == toTransfer : String.format("could 
not read required number of bytes from file to be streamed: read %d bytes, 
wanted %d bytes", lastWrite, toTransfer);
                         outBuffer.flip();
                         output.writeToChannel(outBuffer);
@@ -122,28 +121,28 @@ public class CompressedCassandraStreamWriter extends 
CassandraStreamWriter
     }
 
     // chunks are assumed to be sorted by offset
-    private List<Pair<Long, Long>> 
getTransferSections(CompressionMetadata.Chunk[] chunks)
+    private List<SSTableReader.PartitionPositionBounds> 
getTransferSections(CompressionMetadata.Chunk[] chunks)
     {
-        List<Pair<Long, Long>> transferSections = new ArrayList<>();
-        Pair<Long, Long> lastSection = null;
+        List<SSTableReader.PartitionPositionBounds> transferSections = new 
ArrayList<>();
+        SSTableReader.PartitionPositionBounds lastSection = null;
         for (CompressionMetadata.Chunk chunk : chunks)
         {
             if (lastSection != null)
             {
-                if (chunk.offset == lastSection.right)
+                if (chunk.offset == lastSection.upperPosition)
                 {
                     // extend previous section to end of this chunk
-                    lastSection = Pair.create(lastSection.left, chunk.offset + 
chunk.length + 4); // 4 bytes for CRC
+                    lastSection = new 
SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + 
chunk.length + 4); // 4 bytes for CRC
                 }
                 else
                 {
                     transferSections.add(lastSection);
-                    lastSection = Pair.create(chunk.offset, chunk.offset + 
chunk.length + 4);
+                    lastSection = new 
SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length 
+ 4);
                 }
             }
             else
             {
-                lastSection = Pair.create(chunk.offset, chunk.offset + 
chunk.length + 4);
+                lastSection = new 
SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length 
+ 4);
             }
         }
         if (lastSection != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java 
b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
index 0f0d5c7..aef57e3 100644
--- a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
+++ b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
@@ -23,10 +23,10 @@ import java.util.List;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * Container that carries compression parameters and chunks to decompress data 
from stream.
@@ -45,7 +45,7 @@ public class CompressionInfo
         this.parameters = parameters;
     }
 
-    static CompressionInfo fromCompressionMetadata(CompressionMetadata 
metadata, List<Pair<Long, Long>> sections)
+    static CompressionInfo fromCompressionMetadata(CompressionMetadata 
metadata, List<SSTableReader.PartitionPositionBounds> sections)
     {
         if (metadata == null)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java 
b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index fa0f9f7..e6ef5c3 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -49,12 +49,12 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.SafeMemory;
 import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.SyncUtil;
 import org.apache.cassandra.utils.concurrent.Transactional;
 import org.apache.cassandra.utils.concurrent.Ref;
@@ -262,15 +262,15 @@ public class CompressionMetadata
      * @param sections Collection of sections in uncompressed file. Should not 
contain sections that overlap each other.
      * @return Total chunk size in bytes for given sections including checksum.
      */
-    public long getTotalSizeForSections(Collection<Pair<Long, Long>> sections)
+    public long 
getTotalSizeForSections(Collection<SSTableReader.PartitionPositionBounds> 
sections)
     {
         long size = 0;
         long lastOffset = -1;
-        for (Pair<Long, Long> section : sections)
+        for (SSTableReader.PartitionPositionBounds section : sections)
         {
-            int startIndex = (int) (section.left / parameters.chunkLength());
-            int endIndex = (int) (section.right / parameters.chunkLength());
-            endIndex = section.right % parameters.chunkLength() == 0 ? 
endIndex - 1 : endIndex;
+            int startIndex = (int) (section.lowerPosition / 
parameters.chunkLength());
+            int endIndex = (int) (section.upperPosition / 
parameters.chunkLength());
+            endIndex = section.upperPosition % parameters.chunkLength() == 0 ? 
endIndex - 1 : endIndex;
             for (int i = startIndex; i <= endIndex; i++)
             {
                 long offset = i * 8L;
@@ -292,7 +292,7 @@ public class CompressionMetadata
      * @param sections Collection of sections in uncompressed file
      * @return Array of chunks which corresponds to given sections of 
uncompressed file, sorted by chunk offset
      */
-    public Chunk[] getChunksForSections(Collection<Pair<Long, Long>> sections)
+    public Chunk[] 
getChunksForSections(Collection<SSTableReader.PartitionPositionBounds> sections)
     {
         // use SortedSet to eliminate duplicates and sort by chunk offset
         SortedSet<Chunk> offsets = new TreeSet<Chunk>(new Comparator<Chunk>()
@@ -302,11 +302,11 @@ public class CompressionMetadata
                 return Longs.compare(o1.offset, o2.offset);
             }
         });
-        for (Pair<Long, Long> section : sections)
+        for (SSTableReader.PartitionPositionBounds section : sections)
         {
-            int startIndex = (int) (section.left / parameters.chunkLength());
-            int endIndex = (int) (section.right / parameters.chunkLength());
-            endIndex = section.right % parameters.chunkLength() == 0 ? 
endIndex - 1 : endIndex;
+            int startIndex = (int) (section.lowerPosition / 
parameters.chunkLength());
+            int endIndex = (int) (section.upperPosition / 
parameters.chunkLength());
+            endIndex = section.upperPosition % parameters.chunkLength() == 0 ? 
endIndex - 1 : endIndex;
             for (int i = startIndex; i <= endIndex; i++)
             {
                 long offset = i * 8L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index a6985f7..ebc35e7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -128,7 +128,7 @@ public class SSTableLoader implements StreamEventHandler
                                                   InetAddressAndPort endpoint 
= entry.getKey();
                                                   Collection<Range<Token>> 
tokenRanges = entry.getValue();
 
-                                                  List<Pair<Long, Long>> 
sstableSections = sstable.getPositionsForRanges(tokenRanges);
+                                                  
List<SSTableReader.PartitionPositionBounds> sstableSections = 
sstable.getPositionsForRanges(tokenRanges);
                                                   long estimatedKeys = 
sstable.estimatedKeysForRanges(tokenRanges);
                                                   Ref<SSTableReader> ref = 
sstable.ref();
                                                   OutgoingStream stream = new 
CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, 
estimatedKeys);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index fe1695a..dd93e5f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1354,9 +1354,9 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
     public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
     {
         long sampleKeyCount = 0;
-        List<Pair<Integer, Integer>> sampleIndexes = 
getSampleIndexesForRanges(indexSummary, ranges);
-        for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
-            sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left 
+ 1);
+        List<IndexesBounds> sampleIndexes = 
getSampleIndexesForRanges(indexSummary, ranges);
+        for (IndexesBounds sampleIndexRange : sampleIndexes)
+            sampleKeyCount += (sampleIndexRange.upperPosition - 
sampleIndexRange.lowerPosition + 1);
 
         // adjust for the current sampling level: (BSL / SL) * 
index_interval_at_full_sampling
         long estimatedKeys = sampleKeyCount * ((long) 
Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / 
indexSummary.getSamplingLevel();
@@ -1388,10 +1388,10 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         return indexSummary.getKey(index);
     }
 
-    private static List<Pair<Integer,Integer>> 
getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
+    private static List<IndexesBounds> getSampleIndexesForRanges(IndexSummary 
summary, Collection<Range<Token>> ranges)
     {
         // use the index to determine a minimal section for each range
-        List<Pair<Integer,Integer>> positions = new ArrayList<>();
+        List<IndexesBounds> positions = new ArrayList<>();
 
         for (Range<Token> range : Range.normalize(ranges))
         {
@@ -1425,14 +1425,14 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
             if (left > right)
                 // empty range
                 continue;
-            positions.add(Pair.create(left, right));
+            positions.add(new IndexesBounds(left, right));
         }
         return positions;
     }
 
     public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
     {
-        final List<Pair<Integer, Integer>> indexRanges = 
getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
+        final List<IndexesBounds> indexRanges = 
getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
 
         if (indexRanges.isEmpty())
             return Collections.emptyList();
@@ -1443,18 +1443,18 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
             {
                 return new Iterator<DecoratedKey>()
                 {
-                    private Iterator<Pair<Integer, Integer>> rangeIter = 
indexRanges.iterator();
-                    private Pair<Integer, Integer> current;
+                    private Iterator<IndexesBounds> rangeIter = 
indexRanges.iterator();
+                    private IndexesBounds current;
                     private int idx;
 
                     public boolean hasNext()
                     {
-                        if (current == null || idx > current.right)
+                        if (current == null || idx > current.upperPosition)
                         {
                             if (rangeIter.hasNext())
                             {
                                 current = rangeIter.next();
-                                idx = current.left;
+                                idx = current.lowerPosition;
                                 return true;
                             }
                             return false;
@@ -1482,10 +1482,10 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
      * Determine the minimal set of sections that can be extracted from this 
SSTable to cover the given ranges.
      * @return A sorted list of (offset,end) pairs that cover the given ranges 
in the datafile for this SSTable.
      */
-    public List<Pair<Long,Long>> 
getPositionsForRanges(Collection<Range<Token>> ranges)
+    public List<PartitionPositionBounds> 
getPositionsForRanges(Collection<Range<Token>> ranges)
     {
         // use the index to determine a minimal section for each range
-        List<Pair<Long,Long>> positions = new ArrayList<>();
+        List<PartitionPositionBounds> positions = new ArrayList<>();
         for (Range<Token> range : Range.normalize(ranges))
         {
             assert !range.isWrapAround() || range.right.isMinimum();
@@ -1507,7 +1507,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                 continue;
 
             assert left < right : String.format("Range=%s openReason=%s 
first=%s last=%s left=%d right=%d", range, openReason, first, last, left, 
right);
-            positions.add(Pair.create(left, right));
+            positions.add(new PartitionPositionBounds(left, right));
         }
         return positions;
     }
@@ -2362,4 +2362,59 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                                            SerializationHeader header);
 
     }
+
+    public static class PartitionPositionBounds
+    {
+        public final long lowerPosition;
+        public final long upperPosition;
+
+        public PartitionPositionBounds(long lower, long upper)
+        {
+            this.lowerPosition = lower;
+            this.upperPosition = upper;
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            int hashCode = (int) lowerPosition ^ (int) (lowerPosition >>> 32);
+            return 31 * (hashCode ^ (int) ((int) upperPosition ^  
(upperPosition >>> 32)));
+        }
+
+        @Override
+        public final boolean equals(Object o)
+        {
+            if(!(o instanceof PartitionPositionBounds))
+                return false;
+            PartitionPositionBounds that = (PartitionPositionBounds)o;
+            return lowerPosition == that.lowerPosition && upperPosition == 
that.upperPosition;
+        }
+    }
+
+    public static class IndexesBounds
+    {
+        public final int lowerPosition;
+        public final int upperPosition;
+
+        public IndexesBounds(int lower, int upper)
+        {
+            this.lowerPosition = lower;
+            this.upperPosition = upper;
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            return 31 * lowerPosition * upperPosition;
+        }
+
+        @Override
+        public final boolean equals(Object o)
+        {
+            if (!(o instanceof IndexesBounds))
+                return false;
+            IndexesBounds that = (IndexesBounds) o;
+            return lowerPosition == that.lowerPosition && upperPosition == 
that.upperPosition;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index b01573c..cb25c6b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -42,7 +42,6 @@ import 
org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.dht.AbstractBounds.isEmpty;
 import static org.apache.cassandra.dht.AbstractBounds.maxLeft;
@@ -84,7 +83,7 @@ public class BigTableScanner implements ISSTableScanner
     public static ISSTableScanner getScanner(SSTableReader sstable, 
Collection<Range<Token>> tokenRanges)
     {
         // We want to avoid allocating a SSTableScanner if the range don't 
overlap the sstable (#5249)
-        List<Pair<Long, Long>> positions = 
sstable.getPositionsForRanges(tokenRanges);
+        List<SSTableReader.PartitionPositionBounds> positions = 
sstable.getPositionsForRanges(tokenRanges);
         if (positions.isEmpty())
             return new EmptySSTableScanner(sstable);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java 
b/src/java/org/apache/cassandra/net/MessageOut.java
index 236a770..30968df 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -215,7 +215,7 @@ public class MessageOut<T>
         }
     }
 
-    private Pair<Long, Long> calculateSerializedSize(int version)
+    private MessageOutSizes calculateSerializedSize(int version)
     {
         long size = 0;
         size += 
CompactEndpointSerializationHelper.instance.serializedSize(from, version);
@@ -236,7 +236,7 @@ public class MessageOut<T>
         assert payloadSize <= Integer.MAX_VALUE; // larger values are 
supported in sstables but not messages
         size += TypeSizes.sizeof((int) payloadSize);
         size += payloadSize;
-        return Pair.create(size, payloadSize);
+        return new MessageOutSizes(size, payloadSize);
     }
 
     /**
@@ -258,18 +258,18 @@ public class MessageOut<T>
         if (serializedSize > 0 && serializedSizeVersion == version)
             return serializedSize;
 
-        Pair<Long, Long> sizes = calculateSerializedSize(version);
-        if (sizes.left > Integer.MAX_VALUE)
-            throw new IllegalStateException("message size exceeds maximum 
allowed size: size = " + sizes.left);
+        MessageOutSizes sizes = calculateSerializedSize(version);
+        if (sizes.messageSize > Integer.MAX_VALUE)
+            throw new IllegalStateException("message size exceeds maximum 
allowed size: size = " + sizes.messageSize);
 
         if (serializedSizeVersion == SERIALIZED_SIZE_VERSION_UNDEFINED)
         {
-            serializedSize = sizes.left.intValue();
-            payloadSerializedSize = sizes.right.intValue();
+            serializedSize = Ints.checkedCast(sizes.messageSize);
+            payloadSerializedSize = Ints.checkedCast(sizes.payloadSize);
             serializedSizeVersion = version;
         }
 
-        return sizes.left.intValue();
+        return Ints.checkedCast(sizes.messageSize);
     }
 
     public Object getParameter(ParameterType type)
@@ -283,4 +283,32 @@ public class MessageOut<T>
         }
         return null;
     }
+
+    private static class MessageOutSizes
+    {
+        public final long messageSize;
+        public final long payloadSize;
+
+        private MessageOutSizes(long messageSize, long payloadSize)
+        {
+            this.messageSize = messageSize;
+            this.payloadSize = payloadSize;
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            int hashCode = (int) messageSize ^ (int) (messageSize >>> 32);
+            return 31 * (hashCode ^ (int) ((int) payloadSize ^ (payloadSize 
>>> 32)));
+        }
+
+        @Override
+        public final boolean equals(Object o)
+        {
+            if (!(o instanceof MessageOutSizes))
+                return false;
+            MessageOutSizes that = (MessageOutSizes) o;
+            return messageSize == that.messageSize && payloadSize == 
that.payloadSize;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index bacc3a8..7f0837b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -250,13 +250,13 @@ public class StorageProxy implements StorageProxyMBean
             while (System.nanoTime() - queryStartNanoTime < timeout)
             {
                 // for simplicity, we'll do a single liveness check at the 
start of each attempt
-                Pair<List<InetAddressAndPort>, Integer> p = 
getPaxosParticipants(metadata, key, consistencyForPaxos);
-                List<InetAddressAndPort> liveEndpoints = p.left;
-                int requiredParticipants = p.right;
+                PaxosParticipants p = getPaxosParticipants(metadata, key, 
consistencyForPaxos);
+                List<InetAddressAndPort> liveEndpoints = p.liveEndpoints;
+                int requiredParticipants = p.participants;
 
-                final Pair<UUID, Integer> pair = 
beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, 
requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
-                final UUID ballot = pair.left;
-                contentions += pair.right;
+                final PaxosBallotAndContention pair = 
beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, 
requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
+                final UUID ballot = pair.ballot;
+                contentions += pair.contentions;
 
                 // read the current values and check they validate the 
conditions
                 Tracing.trace("Reading existing values for CAS precondition");
@@ -356,7 +356,7 @@ public class StorageProxy implements StorageProxyMBean
         };
     }
 
-    private static Pair<List<InetAddressAndPort>, Integer> 
getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel 
consistencyForPaxos) throws UnavailableException
+    private static PaxosParticipants getPaxosParticipants(TableMetadata 
metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws 
UnavailableException
     {
         Token tk = key.getToken();
         List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk);
@@ -384,7 +384,7 @@ public class StorageProxy implements StorageProxyMBean
                                            participants + 1,
                                            liveEndpoints.size());
 
-        return Pair.create(liveEndpoints, requiredParticipants);
+        return new PaxosParticipants(liveEndpoints, requiredParticipants);
     }
 
     /**
@@ -393,7 +393,7 @@ public class StorageProxy implements StorageProxyMBean
      * @return the Paxos ballot promised by the replicas if no in-progress 
requests were seen and a quorum of
      * nodes have seen the mostRecentCommit.  Otherwise, return null.
      */
-    private static Pair<UUID, Integer> beginAndRepairPaxos(long 
queryStartNanoTime,
+    private static PaxosBallotAndContention beginAndRepairPaxos(long 
queryStartNanoTime,
                                                            DecoratedKey key,
                                                            TableMetadata 
metadata,
                                                            
List<InetAddressAndPort> liveEndpoints,
@@ -486,7 +486,7 @@ public class StorageProxy implements StorageProxyMBean
                 continue;
             }
 
-            return Pair.create(ballot, contentions);
+            return new PaxosBallotAndContention(ballot, contentions);
         }
 
         recordCasContention(contentions);
@@ -1662,9 +1662,9 @@ public class StorageProxy implements StorageProxyMBean
         try
         {
             // make sure any in-progress paxos writes are done (i.e., 
committed to a majority of replicas), before performing a quorum read
-            Pair<List<InetAddressAndPort>, Integer> p = 
getPaxosParticipants(metadata, key, consistencyLevel);
-            List<InetAddressAndPort> liveEndpoints = p.left;
-            int requiredParticipants = p.right;
+            PaxosParticipants p = getPaxosParticipants(metadata, key, 
consistencyLevel);
+            List<InetAddressAndPort> liveEndpoints = p.liveEndpoints;
+            int requiredParticipants = p.participants;
 
             // does the work of applying in-progress writes; throws UAE or 
timeout if it can't
             final ConsistencyLevel consistencyForCommitOrFetch = 
consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
@@ -1673,9 +1673,9 @@ public class StorageProxy implements StorageProxyMBean
 
             try
             {
-                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, 
key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, 
consistencyForCommitOrFetch, false, state);
-                if (pair.right > 0)
-                    casReadMetrics.contention.update(pair.right);
+                final PaxosBallotAndContention pair = 
beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, 
consistencyLevel, consistencyForCommitOrFetch, false, state);
+                if (pair.contentions > 0)
+                    casReadMetrics.contention.update(pair.contentions);
             }
             catch (WriteTimeoutException e)
             {
@@ -2829,4 +2829,63 @@ public class StorageProxy implements StorageProxyMBean
     public void setOtcBacklogExpirationInterval(int intervalInMillis) {
         DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
     }
+
+
+    static class PaxosParticipants
+    {
+        final List<InetAddressAndPort> liveEndpoints;
+        final int participants;
+
+        PaxosParticipants(List<InetAddressAndPort> liveEndpoints, int 
participants)
+        {
+            this.liveEndpoints = liveEndpoints;
+            this.participants = participants;
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            int hashCode = 31 + (liveEndpoints == null ? 0 : 
liveEndpoints.hashCode());
+            return 31 * hashCode * this.participants;
+        }
+
+        @Override
+        public final boolean equals(Object o)
+        {
+            if(!(o instanceof PaxosParticipants))
+                return false;
+            PaxosParticipants that = (PaxosParticipants)o;
+            // handles nulls properly
+            return Objects.equals(liveEndpoints, that.liveEndpoints) && 
participants == that.participants;
+        }
+    }
+
+    static class PaxosBallotAndContention
+    {
+        final UUID ballot;
+        final int contentions;
+
+        PaxosBallotAndContention(UUID ballot, int contentions)
+        {
+            this.ballot = ballot;
+            this.contentions = contentions;
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            int hashCode = 31 + (ballot == null ? 0 : ballot.hashCode());
+            return 31 * hashCode * this.contentions;
+        }
+
+        @Override
+        public final boolean equals(Object o)
+        {
+            if(!(o instanceof PaxosBallotAndContention))
+                return false;
+            PaxosBallotAndContention that = (PaxosBallotAndContention)o;
+            // handles nulls properly
+            return Objects.equals(ballot, that.ballot) && contentions == 
that.contentions;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 8e73439..91206c1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3448,7 +3448,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
             for (ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores())
             {
-                for (Map.Entry<String, Pair<Long,Long>> snapshotDetail : 
cfStore.getSnapshotDetails().entrySet())
+                for (Map.Entry<String, Directories.SnapshotSizeDetails> 
snapshotDetail : cfStore.getSnapshotDetails().entrySet())
                 {
                     TabularDataSupport data = 
(TabularDataSupport)snapshotMap.get(snapshotDetail.getKey());
                     if (data == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java 
b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index b553a12..888cdc6 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -49,7 +49,6 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 import static junit.framework.Assert.assertNotNull;
 @RunWith(OrderedJUnit4ClassRunner.class)
@@ -240,7 +239,7 @@ public class ColumnFamilyStoreTest
         cfs.snapshot("nonEphemeralSnapshot", null, false, false);
         cfs.snapshot("ephemeralSnapshot", null, true, false);
 
-        Map<String, Pair<Long, Long>> snapshotDetails = 
cfs.getSnapshotDetails();
+        Map<String, Directories.SnapshotSizeDetails> snapshotDetails = 
cfs.getSnapshotDetails();
         assertEquals(2, snapshotDetails.size());
         assertTrue(snapshotDetails.containsKey("ephemeralSnapshot"));
         assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java 
b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index ffd4d3e..bbbb4e6 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -46,7 +46,6 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.DefaultFSErrorHandler;
-import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -216,13 +215,13 @@ public class DirectoriesTest
         assertEquals(40, indexDirectories.trueSnapshotsSize());
 
         // check snapshot details
-        Map<String, Pair<Long, Long>> parentSnapshotDetail = 
parentDirectories.getSnapshotDetails();
+        Map<String, Directories.SnapshotSizeDetails> parentSnapshotDetail = 
parentDirectories.getSnapshotDetails();
         assertTrue(parentSnapshotDetail.containsKey("test"));
-        assertEquals(30L, parentSnapshotDetail.get("test").right.longValue());
+        assertEquals(30L, parentSnapshotDetail.get("test").dataSizeBytes);
 
-        Map<String, Pair<Long, Long>> indexSnapshotDetail = 
indexDirectories.getSnapshotDetails();
+        Map<String, Directories.SnapshotSizeDetails> indexSnapshotDetail = 
indexDirectories.getSnapshotDetails();
         assertTrue(indexSnapshotDetail.containsKey("test"));
-        assertEquals(40L, indexSnapshotDetail.get("test").right.longValue());
+        assertEquals(40L, indexSnapshotDetail.get("test").dataSizeBytes);
 
         // check backup directory
         File parentBackupDirectory = 
Directories.getBackupsDirectory(parentDesc);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 0ebf0a2..6f94696 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -55,7 +55,6 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FilterFactory;
-import org.apache.cassandra.utils.Pair;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 
 import static org.junit.Assert.assertEquals;
@@ -127,11 +126,11 @@ public class SSTableReaderTest
         // confirm that positions increase continuously
         SSTableReader sstable = store.getLiveSSTables().iterator().next();
         long previous = -1;
-        for (Pair<Long,Long> section : sstable.getPositionsForRanges(ranges))
+        for (SSTableReader.PartitionPositionBounds section : 
sstable.getPositionsForRanges(ranges))
         {
-            assert previous <= section.left : previous + " ! < " + 
section.left;
-            assert section.left < section.right : section.left + " ! < " + 
section.right;
-            previous = section.right;
+            assert previous <= section.lowerPosition : previous + " ! < " + 
section.lowerPosition;
+            assert section.lowerPosition < section.upperPosition : 
section.lowerPosition + " ! < " + section.upperPosition;
+            previous = section.upperPosition;
         }
     }
 
@@ -270,13 +269,13 @@ public class SSTableReaderTest
         long p6 = sstable.getPosition(k(6), 
SSTableReader.Operator.EQ).position;
         long p7 = sstable.getPosition(k(7), 
SSTableReader.Operator.EQ).position;
 
-        Pair<Long, Long> p = sstable.getPositionsForRanges(makeRanges(t(2), 
t(6))).get(0);
+        SSTableReader.PartitionPositionBounds p = 
sstable.getPositionsForRanges(makeRanges(t(2), t(6))).get(0);
 
         // range are start exclusive so we should start at 3
-        assert p.left == p3;
+        assert p.lowerPosition == p3;
 
         // to capture 6 we have to stop at the start of 7
-        assert p.right == p7;
+        assert p.upperPosition == p7;
     }
 
     @Test
@@ -537,7 +536,7 @@ public class SSTableReaderTest
         ranges.add(new Range<Token>(t(98), t(99)));
 
         SSTableReader sstable = store.getLiveSSTables().iterator().next();
-        List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
+        List<SSTableReader.PartitionPositionBounds> sections = 
sstable.getPositionsForRanges(ranges);
         assert sections.size() == 1 : "Expected to find range in sstable" ;
 
         // re-open the same sstable as it would be during bulk loading

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 5c3e8c9..6412ef4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -60,7 +60,6 @@ import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
 import static org.junit.Assert.*;
@@ -172,14 +171,14 @@ public class SSTableRewriterTest extends 
SSTableWriterTestBase
                         {
                             SSTableReader c = 
txn.current(sstables.iterator().next());
                             Collection<Range<Token>> r = Arrays.asList(new 
Range<>(cfs.getPartitioner().getMinimumToken(), 
cfs.getPartitioner().getMinimumToken()));
-                            List<Pair<Long, Long>> tmplinkPositions = 
sstable.getPositionsForRanges(r);
-                            List<Pair<Long, Long>> compactingPositions = 
c.getPositionsForRanges(r);
+                            List<SSTableReader.PartitionPositionBounds> 
tmplinkPositions = sstable.getPositionsForRanges(r);
+                            List<SSTableReader.PartitionPositionBounds> 
compactingPositions = c.getPositionsForRanges(r);
                             assertEquals(1, tmplinkPositions.size());
                             assertEquals(1, compactingPositions.size());
-                            assertEquals(0, 
tmplinkPositions.get(0).left.longValue());
+                            assertEquals(0, 
tmplinkPositions.get(0).lowerPosition);
                             // make sure we have no overlap between the early 
opened file and the compacting one:
-                            
assertEquals(tmplinkPositions.get(0).right.longValue(), 
compactingPositions.get(0).left.longValue());
-                            assertEquals(c.uncompressedLength(), 
compactingPositions.get(0).right.longValue());
+                            
assertEquals(tmplinkPositions.get(0).upperPosition, 
compactingPositions.get(0).lowerPosition);
+                            assertEquals(c.uncompressedLength(), 
compactingPositions.get(0).upperPosition);
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a831b99f/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
 
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 60d8b6c..dbca906 100644
--- 
a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.SequentialWriterOption;
 import org.apache.cassandra.schema.CompressionParams;
@@ -37,7 +38,6 @@ import 
org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.db.streaming.CompressedInputStream;
 import org.apache.cassandra.db.streaming.CompressionInfo;
 import org.apache.cassandra.utils.ChecksumType;
-import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -129,11 +129,11 @@ public class CompressedInputStreamTest
         }
 
         CompressionMetadata comp = 
CompressionMetadata.create(tmp.getAbsolutePath());
-        List<Pair<Long, Long>> sections = new ArrayList<>();
+        List<SSTableReader.PartitionPositionBounds> sections = new 
ArrayList<>();
         for (long l : valuesToCheck)
         {
             long position = index.get(l);
-            sections.add(Pair.create(position, position + 8));
+            sections.add(new SSTableReader.PartitionPositionBounds(position, 
position + 8));
         }
         CompressionMetadata.Chunk[] chunks = 
comp.getChunksForSections(sections);
         long totalSize = comp.getTotalSizeForSections(sections);
@@ -179,14 +179,14 @@ public class CompressedInputStreamTest
         {
             for (int i = 0; i < sections.size(); i++)
             {
-                input.position(sections.get(i).left);
+                input.position(sections.get(i).lowerPosition);
                 long readValue = in.readLong();
                 assertEquals("expected " + valuesToCheck[i] + " but was " + 
readValue, valuesToCheck[i], readValue);
             }
         }
     }
 
-    private static void testException(List<Pair<Long, Long>> sections, 
CompressionInfo info) throws IOException
+    private static void 
testException(List<SSTableReader.PartitionPositionBounds> sections, 
CompressionInfo info) throws IOException
     {
         CompressedInputStream input = new CompressedInputStream(new 
DataInputStreamPlus(new ByteArrayInputStream(new byte[0])), info, 
ChecksumType.CRC32, () -> 1.0);
 
@@ -195,7 +195,7 @@ public class CompressedInputStreamTest
             for (int i = 0; i < sections.size(); i++)
             {
                 try {
-                    input.position(sections.get(i).left);
+                    input.position(sections.get(i).lowerPosition);
                     in.readLong();
                     fail("Should have thrown IOException");
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to