serializedSize implementations, part 1 (gossip and streaming packages)
patch by jbellis; reviewed by yukim for CASSANDRA-3617


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b9fc26c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b9fc26c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b9fc26c

Branch: refs/heads/trunk
Commit: 5b9fc26c51161837f01a9383aad8a2786445a4bd
Parents: 9471e8d
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Mon Mar 26 17:53:59 2012 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Tue May 8 12:40:53 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/dht/AbstractBounds.java   |   32 ++++++++---
 .../org/apache/cassandra/gms/EndpointState.java    |   13 ++++-
 .../org/apache/cassandra/gms/GossipDigest.java     |    9 ++-
 .../org/apache/cassandra/gms/GossipDigestAck.java  |   44 ++++++++++++---
 .../org/apache/cassandra/gms/GossipDigestAck2.java |   32 +++++++++--
 .../org/apache/cassandra/gms/GossipDigestSyn.java  |   38 +++---------
 .../cassandra/gms/GossipShutdownMessage.java       |    2 +-
 .../org/apache/cassandra/gms/HeartBeatState.java   |    5 +-
 .../org/apache/cassandra/gms/VersionedValue.java   |   21 ++++---
 .../apache/cassandra/service/MigrationManager.java |    4 +-
 .../apache/cassandra/streaming/PendingFile.java    |   18 +++++-
 .../apache/cassandra/streaming/StreamHeader.java   |   14 ++++-
 .../apache/cassandra/streaming/StreamReply.java    |    6 +-
 .../apache/cassandra/streaming/StreamRequest.java  |   28 +++++++--
 .../cassandra/streaming/StreamingRepairTask.java   |   11 +++-
 .../org/apache/cassandra/utils/FBUtilities.java    |    7 ++
 .../org/apache/cassandra/utils/MerkleTree.java     |   20 +++++--
 src/java/org/apache/cassandra/utils/UUIDGen.java   |    4 +-
 18 files changed, 218 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java 
b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index cdca2b2..44344cc 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessagingService;
@@ -118,12 +119,8 @@ public abstract class AbstractBounds<T extends 
RingPosition> implements Serializ
              * The first int tells us if it's a range or bounds (depending on 
the value) _and_ if it's tokens or keys (depending on the
              * sign). We use negative kind for keys so as to preserve the 
serialization of token from older version.
              */
-            boolean isToken = range.left instanceof Token;
-            int kind = range instanceof Range ? Type.RANGE.ordinal() : 
Type.BOUNDS.ordinal();
-            if (!isToken)
-                kind = -(kind+1);
-            out.writeInt(kind);
-            if (isToken)
+            out.writeInt(kindInt(range));
+            if (range.left instanceof Token)
             {
                 Token.serializer().serialize((Token)range.left, out);
                 Token.serializer().serialize((Token)range.right, out);
@@ -135,6 +132,14 @@ public abstract class AbstractBounds<T extends 
RingPosition> implements Serializ
             }
         }
 
+        private int kindInt(AbstractBounds<?> ab)
+        {
+            int kind = ab instanceof Range ? Type.RANGE.ordinal() : 
Type.BOUNDS.ordinal();
+            if (!(ab.left instanceof Token))
+                kind = -(kind + 1);
+            return kind;
+        }
+
         public AbstractBounds<?> deserialize(DataInput in, int version) throws 
IOException
         {
             int kind = in.readInt();
@@ -159,9 +164,20 @@ public abstract class AbstractBounds<T extends 
RingPosition> implements Serializ
             return new Bounds(left, right);
         }
 
-        public long serializedSize(AbstractBounds<?> abstractBounds, int 
version)
+        public long serializedSize(AbstractBounds<?> ab, int version)
         {
-            throw new UnsupportedOperationException();
+            int size = DBTypeSizes.NATIVE.sizeof(kindInt(ab));
+            if (ab.left instanceof Token)
+            {
+                size += Token.serializer().serializedSize((Token) ab.left, 
DBTypeSizes.NATIVE);
+                size += Token.serializer().serializedSize((Token) ab.right, 
DBTypeSizes.NATIVE);
+            }
+            else
+            {
+                size += RowPosition.serializer().serializedSize((RowPosition) 
ab.left, DBTypeSizes.NATIVE);
+                size += RowPosition.serializer().serializedSize((RowPosition) 
ab.right, DBTypeSizes.NATIVE);
+            }
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 05757ee..31eaafd 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -150,8 +151,16 @@ class EndpointStateSerializer implements 
IVersionedSerializer<EndpointState>
         return epState;
     }
 
-    public long serializedSize(EndpointState endpointState, int version)
+    public long serializedSize(EndpointState epState, int version)
     {
-        throw new UnsupportedOperationException();
+        long size = 
HeartBeatState.serializer().serializedSize(epState.getHeartBeatState(), 
version);
+        size += DBTypeSizes.NATIVE.sizeof(epState.applicationState.size());
+        for (Map.Entry<ApplicationState, VersionedValue> entry : 
epState.applicationState.entrySet())
+        {
+            VersionedValue value = entry.getValue();
+            size += DBTypeSizes.NATIVE.sizeof(entry.getKey().ordinal());
+            size += VersionedValue.serializer.serializedSize(value, version);
+        }
+        return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/GossipDigest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigest.java 
b/src/java/org/apache/cassandra/gms/GossipDigest.java
index 3dc8294..fcc598e 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigest.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigest.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.gms;
 import java.io.*;
 import java.net.InetAddress;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Contains information about a specified list of Endpoints and the largest 
version
@@ -103,8 +105,11 @@ class GossipDigestSerializer implements 
IVersionedSerializer<GossipDigest>
         return new GossipDigest(endpoint, generation, maxVersion);
     }
 
-    public long serializedSize(GossipDigest gossipDigest, int version)
+    public long serializedSize(GossipDigest gDigest, int version)
     {
-        throw new UnsupportedOperationException();
+        long size = 
CompactEndpointSerializationHelper.serializedSize(gDigest.endpoint);
+        size += DBTypeSizes.NATIVE.sizeof(gDigest.generation);
+        size += DBTypeSizes.NATIVE.sizeof(gDigest.maxVersion);
+        return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/GossipDigestAck.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck.java 
b/src/java/org/apache/cassandra/gms/GossipDigestAck.java
index 64a6e3c..0e01d60 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck.java
@@ -17,13 +17,18 @@
  */
 package org.apache.cassandra.gms;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.net.InetAddress;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
-
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
 
 
 /**
@@ -69,20 +74,43 @@ class GossipDigestAckSerializer implements 
IVersionedSerializer<GossipDigestAck>
     public void serialize(GossipDigestAck gDigestAckMessage, DataOutput dos, 
int version) throws IOException
     {
         
GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList, dos, 
version);
-        dos.writeBoolean(true); // 0.6 compatibility
-        
EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap, dos, 
version);
+        if (version <= MessagingService.VERSION_10)
+            dos.writeBoolean(true); // 0.6 compatibility
+        dos.writeInt(gDigestAckMessage.epStateMap.size());
+        for (Map.Entry<InetAddress, EndpointState> entry : 
gDigestAckMessage.epStateMap.entrySet())
+        {
+            InetAddress ep = entry.getKey();
+            CompactEndpointSerializationHelper.serialize(ep, dos);
+            EndpointState.serializer().serialize(entry.getValue(), dos, 
version);
+        }
     }
 
     public GossipDigestAck deserialize(DataInput dis, int version) throws 
IOException
     {
         List<GossipDigest> gDigestList = 
GossipDigestSerializationHelper.deserialize(dis, version);
-        dis.readBoolean(); // 0.6 compatibility
-        Map<InetAddress, EndpointState> epStateMap = 
EndpointStatesSerializationHelper.deserialize(dis, version);
+        if (version <= MessagingService.VERSION_10)
+            dis.readBoolean(); // 0.6 compatibility
+        int size = dis.readInt();
+        Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, 
EndpointState>(size);
+
+        for (int i = 0; i < size; ++i)
+        {
+            InetAddress ep = 
CompactEndpointSerializationHelper.deserialize(dis);
+            EndpointState epState = 
EndpointState.serializer().deserialize(dis, version);
+            epStateMap.put(ep, epState);
+        }
         return new GossipDigestAck(gDigestList, epStateMap);
     }
 
-    public long serializedSize(GossipDigestAck gossipDigestAckMessage, int 
version)
+    public long serializedSize(GossipDigestAck ack, int version)
     {
-        throw new UnsupportedOperationException();
+        int size = 
GossipDigestSerializationHelper.serializedSize(ack.gDigestList, version);
+        if (version <= MessagingService.VERSION_11)
+            size += DBTypeSizes.NATIVE.sizeof(true);
+        size += DBTypeSizes.NATIVE.sizeof(ack.epStateMap.size());
+        for (Map.Entry<InetAddress, EndpointState> entry : 
ack.epStateMap.entrySet())
+            size += 
CompactEndpointSerializationHelper.serializedSize(entry.getKey())
+                  + 
EndpointState.serializer().serializedSize(entry.getValue(), version);
+        return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java 
b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
index a95f43a..c77c223 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
@@ -19,9 +19,12 @@ package org.apache.cassandra.gms;
 
 import java.io.*;
 import java.net.InetAddress;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 
 
 /**
@@ -57,21 +60,38 @@ public class GossipDigestAck2
 
 class GossipDigestAck2Serializer implements 
IVersionedSerializer<GossipDigestAck2>
 {
-    public void serialize(GossipDigestAck2 gDigestAck2Message, DataOutput dos, 
int version) throws IOException
+    public void serialize(GossipDigestAck2 ack2, DataOutput dos, int version) 
throws IOException
     {
-        /* Use the EndpointState */
-        
EndpointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap, dos, 
version);
+        dos.writeInt(ack2.epStateMap.size());
+        for (Map.Entry<InetAddress, EndpointState> entry : 
ack2.epStateMap.entrySet())
+        {
+            InetAddress ep = entry.getKey();
+            CompactEndpointSerializationHelper.serialize(ep, dos);
+            EndpointState.serializer().serialize(entry.getValue(), dos, 
version);
+        }
     }
 
     public GossipDigestAck2 deserialize(DataInput dis, int version) throws 
IOException
     {
-        Map<InetAddress, EndpointState> epStateMap = 
EndpointStatesSerializationHelper.deserialize(dis, version);
+        int size = dis.readInt();
+        Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, 
EndpointState>(size);
+
+        for (int i = 0; i < size; ++i)
+        {
+            InetAddress ep = 
CompactEndpointSerializationHelper.deserialize(dis);
+            EndpointState epState = 
EndpointState.serializer().deserialize(dis, version);
+            epStateMap.put(ep, epState);
+        }
         return new GossipDigestAck2(epStateMap);
     }
 
-    public long serializedSize(GossipDigestAck2 gossipDigestAck2Message, int 
version)
+    public long serializedSize(GossipDigestAck2 ack2, int version)
     {
-        throw new UnsupportedOperationException();
+        long size = DBTypeSizes.NATIVE.sizeof(ack2.epStateMap.size());
+        for (Map.Entry<InetAddress, EndpointState> entry : 
ack2.epStateMap.entrySet())
+            size += 
CompactEndpointSerializationHelper.serializedSize(entry.getKey())
+                  + 
EndpointState.serializer().serializedSize(entry.getValue(), version);
+        return size;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java 
b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
index e1dd59d..aaec57a 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
@@ -25,8 +25,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 /**
@@ -84,33 +86,13 @@ class GossipDigestSerializationHelper
         }
         return gDigests;
     }
-}
-
-class EndpointStatesSerializationHelper
-{
-    static void serialize(Map<InetAddress, EndpointState> epStateMap, 
DataOutput dos, int version) throws IOException
+    
+    static int serializedSize(List<GossipDigest> digests, int version)
     {
-        dos.writeInt(epStateMap.size());
-        for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
-        {
-            InetAddress ep = entry.getKey();
-            CompactEndpointSerializationHelper.serialize(ep, dos);
-            EndpointState.serializer().serialize(entry.getValue(), dos, 
version);
-        }
-    }
-
-    static Map<InetAddress, EndpointState> deserialize(DataInput dis, int 
version) throws IOException
-    {
-        int size = dis.readInt();
-        Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, 
EndpointState>(size);
-
-        for ( int i = 0; i < size; ++i )
-        {
-            InetAddress ep = 
CompactEndpointSerializationHelper.deserialize(dis);
-            EndpointState epState = 
EndpointState.serializer().deserialize(dis, version);
-            epStateMap.put(ep, epState);
-        }
-        return epStateMap;
+        int size = DBTypeSizes.NATIVE.sizeof(digests.size());
+        for (GossipDigest digest : digests)
+            size += GossipDigest.serializer().serializedSize(digest, version);
+        return size;
     }
 }
 
@@ -129,9 +111,9 @@ class GossipDigestSynSerializer implements 
IVersionedSerializer<GossipDigestSyn>
         return new GossipDigestSyn(clusterId, gDigests);
     }
 
-    public long serializedSize(GossipDigestSyn gossipDigestSynMessage, int 
version)
+    public long serializedSize(GossipDigestSyn syn, int version)
     {
-        throw new UnsupportedOperationException();
+        return FBUtilities.serializedUTF8Size(syn.clusterId) + 
GossipDigestSerializationHelper.serializedSize(syn.gDigests, version);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java 
b/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java
index 3122986..d3cdaf1 100644
--- a/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java
+++ b/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java
@@ -58,6 +58,6 @@ class GossipShutdownMessageSerializer implements 
IVersionedSerializer<GossipShut
 
     public long serializedSize(GossipShutdownMessage gossipShutdownMessage, 
int version)
     {
-        throw new UnsupportedOperationException();
+        return 0;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/HeartBeatState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java 
b/src/java/org/apache/cassandra/gms/HeartBeatState.java
index 294d460..bdbb6a3 100644
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.gms;
 
 import java.io.*;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 
 
@@ -88,8 +89,8 @@ class HeartBeatStateSerializer implements 
IVersionedSerializer<HeartBeatState>
         return new HeartBeatState(dis.readInt(), dis.readInt());
     }
 
-    public long serializedSize(HeartBeatState heartBeatState, int version)
+    public long serializedSize(HeartBeatState state, int version)
     {
-        throw new UnsupportedOperationException();
+        return DBTypeSizes.NATIVE.sizeof(state.getGeneration()) + 
DBTypeSizes.NATIVE.sizeof(state.getHeartBeatVersion());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java 
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 10dbb6a..807eb88 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -21,6 +21,7 @@ import java.io.*;
 import java.net.InetAddress;
 import java.util.UUID;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -199,6 +200,12 @@ public class VersionedValue implements 
Comparable<VersionedValue>
     {
         public void serialize(VersionedValue value, DataOutput dos, int 
version) throws IOException
         {
+            dos.writeUTF(outValue(value, version));
+            dos.writeInt(value.version);
+        }
+
+        private String outValue(VersionedValue value, int version)
+        {
             String outValue = value.value;
 
             if (version < MessagingService.VERSION_12)
@@ -206,20 +213,17 @@ public class VersionedValue implements 
Comparable<VersionedValue>
                 String[] pieces = value.value.split(DELIMITER_STR, -1);
                 String type = pieces[0];
 
-                if ((type == STATUS_NORMAL) || type == STATUS_BOOTSTRAPPING)
+                if ((type.equals(STATUS_NORMAL)) || 
type.equals(STATUS_BOOTSTRAPPING))
                 {
                     assert pieces.length >= 3;
                     outValue = versionString(pieces[0], pieces[2]);
                 }
 
-                if ((type == REMOVAL_COORDINATOR) || (type == REMOVING_TOKEN) 
|| (type == REMOVED_TOKEN))
+                if ((type.equals(REMOVAL_COORDINATOR)) || 
(type.equals(REMOVING_TOKEN)) || (type.equals(REMOVED_TOKEN)))
                     throw new RuntimeException(String.format("Unable to 
serialize %s(%s...) for nodes older than 1.2",
-                                                             
VersionedValue.class.getName(),
-                                                             type));
+                                                             
VersionedValue.class.getName(), type));
             }
-
-            dos.writeUTF(outValue);
-            dos.writeInt(value.version);
+            return outValue;
         }
 
         public VersionedValue deserialize(DataInput dis, int version) throws 
IOException
@@ -231,7 +235,8 @@ public class VersionedValue implements 
Comparable<VersionedValue>
 
         public long serializedSize(VersionedValue value, int version)
         {
-            throw new UnsupportedOperationException();
+            int outLength = FBUtilities.encodedUTF8Length(outValue(value, 
version));
+            return DBTypeSizes.NATIVE.sizeof(outLength) + outLength + 
DBTypeSizes.NATIVE.sizeof(value.version);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java 
b/src/java/org/apache/cassandra/service/MigrationManager.java
index 3e88984..7a9b0d7 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.DBConstants;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
@@ -328,11 +327,10 @@ public class MigrationManager implements 
IEndpointStateChangeSubscriber
 
         public long serializedSize(Collection<RowMutation> schema, int version)
         {
-            int size = DBConstants.INT_SIZE;
+            int size = DBTypeSizes.NATIVE.sizeof(schema.size());
             for (RowMutation rm : schema)
                 size += RowMutation.serializer().serializedSize(rm, version);
             return size;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/streaming/PendingFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/PendingFile.java 
b/src/java/org/apache/cassandra/streaming/PendingFile.java
index 1554f85..41ccf46 100644
--- a/src/java/org/apache/cassandra/streaming/PendingFile.java
+++ b/src/java/org/apache/cassandra/streaming/PendingFile.java
@@ -21,10 +21,12 @@ import java.io.*;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -147,9 +149,21 @@ public class PendingFile
             return new PendingFile(null, desc, component, sections, type, 
estimatedKeys);
         }
 
-        public long serializedSize(PendingFile pendingFile, int version)
+        public long serializedSize(PendingFile pf, int version)
         {
-            throw new UnsupportedOperationException();
+            if (pf == null)
+                return DBTypeSizes.NATIVE.sizeof(0);
+
+            long size = 
FBUtilities.serializedUTF8Size(pf.desc.filenameFor(pf.component));
+            size += FBUtilities.serializedUTF8Size(pf.component);
+            size += DBTypeSizes.NATIVE.sizeof(pf.sections.size());
+            for (Pair<Long,Long> section : pf.sections)
+                size += DBTypeSizes.NATIVE.sizeof(section.left + 
DBTypeSizes.NATIVE.sizeof(section.right));
+            if (version > MessagingService.VERSION_07)
+                size += FBUtilities.serializedUTF8Size(pf.type.name());
+            if (version > MessagingService.VERSION_080)
+                size += DBTypeSizes.NATIVE.sizeof(pf.estimatedKeys);
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/streaming/StreamHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamHeader.java 
b/src/java/org/apache/cassandra/streaming/StreamHeader.java
index 00d0049..b98aab5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamHeader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamHeader.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
@@ -110,9 +111,16 @@ public class StreamHeader
             return new StreamHeader(table, sessionId, file, pendingFiles, bca);
         }
 
-        public long serializedSize(StreamHeader streamHeader, int version)
+        public long serializedSize(StreamHeader sh, int version)
         {
-            throw new UnsupportedOperationException();
-        }
+            long size = FBUtilities.serializedUTF8Size(sh.table);
+            size += DBTypeSizes.NATIVE.sizeof(sh.sessionId);
+            size += PendingFile.serializer().serializedSize(sh.file, version);
+            size += DBTypeSizes.NATIVE.sizeof(sh.pendingFiles.size());
+            for(PendingFile file : sh.pendingFiles)
+                size += PendingFile.serializer().serializedSize(file, version);
+            size += 
CompactEndpointSerializationHelper.serializedSize(sh.broadcastAddress);
+            return size;
+       }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/streaming/StreamReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java 
b/src/java/org/apache/cassandra/streaming/StreamReply.java
index d207d6e..55ded17 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReply.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReply.java
@@ -21,9 +21,11 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class StreamReply
 {
@@ -80,9 +82,9 @@ public class StreamReply
             return new StreamReply(targetFile, sessionId, action);
         }
 
-        public long serializedSize(StreamReply streamReply, int version)
+        public long serializedSize(StreamReply reply, int version)
         {
-            throw new UnsupportedOperationException();
+            return DBTypeSizes.NATIVE.sizeof(reply.sessionId) + 
FBUtilities.serializedUTF8Size(reply.file) + 
DBTypeSizes.NATIVE.sizeof(reply.action.ordinal());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java 
b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 3814049..fa1d864 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
@@ -36,6 +37,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
 * This class encapsulates the message that needs to be sent to nodes
@@ -138,9 +140,7 @@ public class StreamRequest
                 dos.writeUTF(srm.table);
                 dos.writeInt(srm.ranges.size());
                 for (Range<Token> range : srm.ranges)
-                {
                     AbstractBounds.serializer().serialize(range, dos, version);
-                }
 
                 if (version > MessagingService.VERSION_07)
                     dos.writeUTF(srm.type.name());
@@ -170,9 +170,7 @@ public class StreamRequest
                 int size = dis.readInt();
                 List<Range<Token>> ranges = (size == 0) ? null : new 
ArrayList<Range<Token>>(size);
                 for( int i = 0; i < size; ++i )
-                {
                     ranges.add((Range<Token>) 
AbstractBounds.serializer().deserialize(dis, version).toTokenBounds());
-                }
                 OperationType type = OperationType.RESTORE_REPLICA_COUNT;
                 if (version > MessagingService.VERSION_07)
                     type = OperationType.valueOf(dis.readUTF());
@@ -189,9 +187,27 @@ public class StreamRequest
             }
         }
 
-        public long serializedSize(StreamRequest streamRequestMessage, int 
version)
+        public long serializedSize(StreamRequest sr, int version)
         {
-            throw new UnsupportedOperationException();
+            long size = DBTypeSizes.NATIVE.sizeof(sr.sessionId);
+            size += 
CompactEndpointSerializationHelper.serializedSize(sr.target);
+            size += DBTypeSizes.NATIVE.sizeof(true);
+            if (sr.file != null)
+                return size + PendingFile.serializer().serializedSize(sr.file, 
version);
+
+            size += FBUtilities.serializedUTF8Size(sr.table);
+            size += DBTypeSizes.NATIVE.sizeof(sr.ranges.size());
+            for (Range<Token> range : sr.ranges)
+                size += AbstractBounds.serializer().serializedSize(range, 
version);
+            if (version > MessagingService.VERSION_07)
+                size += FBUtilities.serializedUTF8Size(sr.type.name());
+            if (version > MessagingService.VERSION_080)
+            {
+                size += 
DBTypeSizes.NATIVE.sizeof(Iterables.size(sr.columnFamilies));
+                for (ColumnFamilyStore cfs : sr.columnFamilies)
+                    size += DBTypeSizes.NATIVE.sizeof(cfs.metadata.cfId);
+            }
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index 3074bc0..947de9b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
@@ -38,6 +39,7 @@ import org.apache.cassandra.net.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
+
 /**
  * Task that make two nodes exchange (stream) some ranges (for a given 
table/cf).
  * This handle the case where the local node is neither of the two nodes that
@@ -265,7 +267,14 @@ public class StreamingRepairTask implements Runnable
 
         public long serializedSize(StreamingRepairTask task, int version)
         {
-            throw new UnsupportedOperationException();
+            long size = UUIDGen.serializer.serializedSize(task.id, version);
+            size += 3 * 
CompactEndpointSerializationHelper.serializedSize(task.owner);
+            size += FBUtilities.serializedUTF8Size(task.tableName);
+            size += FBUtilities.serializedUTF8Size(task.cfName);
+            size += DBTypeSizes.NATIVE.sizeof(task.ranges.size());
+            for (Range<Token> range : task.ranges)
+                size += AbstractBounds.serializer().serializedSize(range, 
version);
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index ac55d08..8b5e2f3 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.cache.IRowCacheProvider;
 import org.apache.cassandra.concurrent.CreationTimeAwareFuture;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
@@ -585,6 +586,12 @@ public class FBUtilities
         }
     }
 
+    public static int serializedUTF8Size(String st)
+    {
+        int length = encodedUTF8Length(st);
+        return DBTypeSizes.NATIVE.sizeof(length) + length;
+    }
+
     private static final class WrappedCloseableIterator<T>
         extends AbstractIterator<T> implements CloseableIterator<T>
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java 
b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 9b56d76..2cbecb8 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -26,7 +26,7 @@ import java.util.*;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.PeekingIterator;
 
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -103,9 +103,12 @@ public class MerkleTree implements Serializable
             return mt;
         }
 
-        public long serializedSize(MerkleTree merkleTree, int version)
+        public long serializedSize(MerkleTree mt, int version)
         {
-            return 1 + DBConstants.LONG_SIZE + DBConstants.LONG_SIZE + 
Hashable.serializer.serializedSize(merkleTree.root, version);
+            return DBTypeSizes.NATIVE.sizeof(mt.hashdepth)
+                 + DBTypeSizes.NATIVE.sizeof(mt.maxsize)
+                 + DBTypeSizes.NATIVE.sizeof(mt.size)
+                 + Hashable.serializer.serializedSize(mt.root, version);
         }
     }
 
@@ -711,8 +714,11 @@ public class MerkleTree implements Serializable
 
             public long serializedSize(Inner inner, int version)
             {
-                int size = inner.hash == null ? DBConstants.INT_SIZE : 
DBConstants.INT_SIZE + inner.hash.length;
-                size += Token.serializer().serializedSize(inner.token)
+                int size = inner.hash == null
+                         ? DBTypeSizes.NATIVE.sizeof(-1)
+                         : DBTypeSizes.NATIVE.sizeof(inner.hash().length) + 
inner.hash().length;
+
+                size += Token.serializer().serializedSize(inner.token, 
DBTypeSizes.NATIVE)
                         + Hashable.serializer.serializedSize(inner.lchild, 
version)
                         + Hashable.serializer.serializedSize(inner.rchild, 
version);
                 return size;
@@ -790,7 +796,9 @@ public class MerkleTree implements Serializable
 
             public long serializedSize(Leaf leaf, int version)
             {
-                return leaf.hash == null ? DBConstants.INT_SIZE : 
DBConstants.INT_SIZE + leaf.hash.length;
+                return leaf.hash == null
+                     ? DBTypeSizes.NATIVE.sizeof(-1)
+                     : DBTypeSizes.NATIVE.sizeof(leaf.hash().length) + 
leaf.hash().length;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java 
b/src/java/org/apache/cassandra/utils/UUIDGen.java
index 19f2e9b..b83d16d 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 
 /**
@@ -110,7 +110,7 @@ public class UUIDGen
 
         public long serializedSize(UUID uuid, int version)
         {
-            return DBConstants.LONG_SIZE + DBConstants.LONG_SIZE;
+            return DBTypeSizes.NATIVE.sizeof(uuid.getMostSignificantBits()) + 
DBTypeSizes.NATIVE.sizeof(uuid.getLeastSignificantBits());
         }
     }
 

Reply via email to