serializedSize implementations, part 1 (gossip and streaming packages) patch by jbellis; reviewed by yukim for CASSANDRA-3617
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b9fc26c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b9fc26c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b9fc26c Branch: refs/heads/trunk Commit: 5b9fc26c51161837f01a9383aad8a2786445a4bd Parents: 9471e8d Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Mar 26 17:53:59 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue May 8 12:40:53 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/dht/AbstractBounds.java | 32 ++++++++--- .../org/apache/cassandra/gms/EndpointState.java | 13 ++++- .../org/apache/cassandra/gms/GossipDigest.java | 9 ++- .../org/apache/cassandra/gms/GossipDigestAck.java | 44 ++++++++++++--- .../org/apache/cassandra/gms/GossipDigestAck2.java | 32 +++++++++-- .../org/apache/cassandra/gms/GossipDigestSyn.java | 38 +++--------- .../cassandra/gms/GossipShutdownMessage.java | 2 +- .../org/apache/cassandra/gms/HeartBeatState.java | 5 +- .../org/apache/cassandra/gms/VersionedValue.java | 21 ++++--- .../apache/cassandra/service/MigrationManager.java | 4 +- .../apache/cassandra/streaming/PendingFile.java | 18 +++++- .../apache/cassandra/streaming/StreamHeader.java | 14 ++++- .../apache/cassandra/streaming/StreamReply.java | 6 +- .../apache/cassandra/streaming/StreamRequest.java | 28 +++++++-- .../cassandra/streaming/StreamingRepairTask.java | 11 +++- .../org/apache/cassandra/utils/FBUtilities.java | 7 ++ .../org/apache/cassandra/utils/MerkleTree.java | 20 +++++-- src/java/org/apache/cassandra/utils/UUIDGen.java | 4 +- 18 files changed, 218 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/dht/AbstractBounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java index cdca2b2..44344cc 100644 --- a/src/java/org/apache/cassandra/dht/AbstractBounds.java +++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.*; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.MessagingService; @@ -118,12 +119,8 @@ public abstract class AbstractBounds<T extends RingPosition> implements Serializ * The first int tells us if it's a range or bounds (depending on the value) _and_ if it's tokens or keys (depending on the * sign). We use negative kind for keys so as to preserve the serialization of token from older version. */ - boolean isToken = range.left instanceof Token; - int kind = range instanceof Range ? Type.RANGE.ordinal() : Type.BOUNDS.ordinal(); - if (!isToken) - kind = -(kind+1); - out.writeInt(kind); - if (isToken) + out.writeInt(kindInt(range)); + if (range.left instanceof Token) { Token.serializer().serialize((Token)range.left, out); Token.serializer().serialize((Token)range.right, out); @@ -135,6 +132,14 @@ public abstract class AbstractBounds<T extends RingPosition> implements Serializ } } + private int kindInt(AbstractBounds<?> ab) + { + int kind = ab instanceof Range ? Type.RANGE.ordinal() : Type.BOUNDS.ordinal(); + if (!(ab.left instanceof Token)) + kind = -(kind + 1); + return kind; + } + public AbstractBounds<?> deserialize(DataInput in, int version) throws IOException { int kind = in.readInt(); @@ -159,9 +164,20 @@ public abstract class AbstractBounds<T extends RingPosition> implements Serializ return new Bounds(left, right); } - public long serializedSize(AbstractBounds<?> abstractBounds, int version) + public long serializedSize(AbstractBounds<?> ab, int version) { - throw new UnsupportedOperationException(); + int size = DBTypeSizes.NATIVE.sizeof(kindInt(ab)); + if (ab.left instanceof Token) + { + size += Token.serializer().serializedSize((Token) ab.left, DBTypeSizes.NATIVE); + size += Token.serializer().serializedSize((Token) ab.right, DBTypeSizes.NATIVE); + } + else + { + size += RowPosition.serializer().serializedSize((RowPosition) ab.left, DBTypeSizes.NATIVE); + size += RowPosition.serializer().serializedSize((RowPosition) ab.right, DBTypeSizes.NATIVE); + } + return size; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/EndpointState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 05757ee..31eaafd 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -23,6 +23,7 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -150,8 +151,16 @@ class EndpointStateSerializer implements IVersionedSerializer<EndpointState> return epState; } - public long serializedSize(EndpointState endpointState, int version) + public long serializedSize(EndpointState epState, int version) { - throw new UnsupportedOperationException(); + long size = HeartBeatState.serializer().serializedSize(epState.getHeartBeatState(), version); + size += DBTypeSizes.NATIVE.sizeof(epState.applicationState.size()); + for (Map.Entry<ApplicationState, VersionedValue> entry : epState.applicationState.entrySet()) + { + VersionedValue value = entry.getValue(); + size += DBTypeSizes.NATIVE.sizeof(entry.getKey().ordinal()); + size += VersionedValue.serializer.serializedSize(value, version); + } + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/GossipDigest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigest.java b/src/java/org/apache/cassandra/gms/GossipDigest.java index 3dc8294..fcc598e 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigest.java +++ b/src/java/org/apache/cassandra/gms/GossipDigest.java @@ -20,8 +20,10 @@ package org.apache.cassandra.gms; import java.io.*; import java.net.InetAddress; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.CompactEndpointSerializationHelper; +import org.apache.cassandra.utils.FBUtilities; /** * Contains information about a specified list of Endpoints and the largest version @@ -103,8 +105,11 @@ class GossipDigestSerializer implements IVersionedSerializer<GossipDigest> return new GossipDigest(endpoint, generation, maxVersion); } - public long serializedSize(GossipDigest gossipDigest, int version) + public long serializedSize(GossipDigest gDigest, int version) { - throw new UnsupportedOperationException(); + long size = CompactEndpointSerializationHelper.serializedSize(gDigest.endpoint); + size += DBTypeSizes.NATIVE.sizeof(gDigest.generation); + size += DBTypeSizes.NATIVE.sizeof(gDigest.maxVersion); + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/GossipDigestAck.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck.java b/src/java/org/apache/cassandra/gms/GossipDigestAck.java index 64a6e3c..0e01d60 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAck.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAck.java @@ -17,13 +17,18 @@ */ package org.apache.cassandra.gms; -import java.io.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.net.InetAddress; +import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.io.IVersionedSerializer; - +import org.apache.cassandra.net.CompactEndpointSerializationHelper; +import org.apache.cassandra.net.MessagingService; /** @@ -69,20 +74,43 @@ class GossipDigestAckSerializer implements IVersionedSerializer<GossipDigestAck> public void serialize(GossipDigestAck gDigestAckMessage, DataOutput dos, int version) throws IOException { GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList, dos, version); - dos.writeBoolean(true); // 0.6 compatibility - EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap, dos, version); + if (version <= MessagingService.VERSION_10) + dos.writeBoolean(true); // 0.6 compatibility + dos.writeInt(gDigestAckMessage.epStateMap.size()); + for (Map.Entry<InetAddress, EndpointState> entry : gDigestAckMessage.epStateMap.entrySet()) + { + InetAddress ep = entry.getKey(); + CompactEndpointSerializationHelper.serialize(ep, dos); + EndpointState.serializer().serialize(entry.getValue(), dos, version); + } } public GossipDigestAck deserialize(DataInput dis, int version) throws IOException { List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis, version); - dis.readBoolean(); // 0.6 compatibility - Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis, version); + if (version <= MessagingService.VERSION_10) + dis.readBoolean(); // 0.6 compatibility + int size = dis.readInt(); + Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, EndpointState>(size); + + for (int i = 0; i < size; ++i) + { + InetAddress ep = CompactEndpointSerializationHelper.deserialize(dis); + EndpointState epState = EndpointState.serializer().deserialize(dis, version); + epStateMap.put(ep, epState); + } return new GossipDigestAck(gDigestList, epStateMap); } - public long serializedSize(GossipDigestAck gossipDigestAckMessage, int version) + public long serializedSize(GossipDigestAck ack, int version) { - throw new UnsupportedOperationException(); + int size = GossipDigestSerializationHelper.serializedSize(ack.gDigestList, version); + if (version <= MessagingService.VERSION_11) + size += DBTypeSizes.NATIVE.sizeof(true); + size += DBTypeSizes.NATIVE.sizeof(ack.epStateMap.size()); + for (Map.Entry<InetAddress, EndpointState> entry : ack.epStateMap.entrySet()) + size += CompactEndpointSerializationHelper.serializedSize(entry.getKey()) + + EndpointState.serializer().serializedSize(entry.getValue(), version); + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/GossipDigestAck2.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java index a95f43a..c77c223 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java @@ -19,9 +19,12 @@ package org.apache.cassandra.gms; import java.io.*; import java.net.InetAddress; +import java.util.HashMap; import java.util.Map; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; /** @@ -57,21 +60,38 @@ public class GossipDigestAck2 class GossipDigestAck2Serializer implements IVersionedSerializer<GossipDigestAck2> { - public void serialize(GossipDigestAck2 gDigestAck2Message, DataOutput dos, int version) throws IOException + public void serialize(GossipDigestAck2 ack2, DataOutput dos, int version) throws IOException { - /* Use the EndpointState */ - EndpointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap, dos, version); + dos.writeInt(ack2.epStateMap.size()); + for (Map.Entry<InetAddress, EndpointState> entry : ack2.epStateMap.entrySet()) + { + InetAddress ep = entry.getKey(); + CompactEndpointSerializationHelper.serialize(ep, dos); + EndpointState.serializer().serialize(entry.getValue(), dos, version); + } } public GossipDigestAck2 deserialize(DataInput dis, int version) throws IOException { - Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis, version); + int size = dis.readInt(); + Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, EndpointState>(size); + + for (int i = 0; i < size; ++i) + { + InetAddress ep = CompactEndpointSerializationHelper.deserialize(dis); + EndpointState epState = EndpointState.serializer().deserialize(dis, version); + epStateMap.put(ep, epState); + } return new GossipDigestAck2(epStateMap); } - public long serializedSize(GossipDigestAck2 gossipDigestAck2Message, int version) + public long serializedSize(GossipDigestAck2 ack2, int version) { - throw new UnsupportedOperationException(); + long size = DBTypeSizes.NATIVE.sizeof(ack2.epStateMap.size()); + for (Map.Entry<InetAddress, EndpointState> entry : ack2.epStateMap.entrySet()) + size += CompactEndpointSerializationHelper.serializedSize(entry.getKey()) + + EndpointState.serializer().serializedSize(entry.getValue(), version); + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/GossipDigestSyn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java index e1dd59d..aaec57a 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java @@ -25,8 +25,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.CompactEndpointSerializationHelper; +import org.apache.cassandra.utils.FBUtilities; /** @@ -84,33 +86,13 @@ class GossipDigestSerializationHelper } return gDigests; } -} - -class EndpointStatesSerializationHelper -{ - static void serialize(Map<InetAddress, EndpointState> epStateMap, DataOutput dos, int version) throws IOException + + static int serializedSize(List<GossipDigest> digests, int version) { - dos.writeInt(epStateMap.size()); - for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet()) - { - InetAddress ep = entry.getKey(); - CompactEndpointSerializationHelper.serialize(ep, dos); - EndpointState.serializer().serialize(entry.getValue(), dos, version); - } - } - - static Map<InetAddress, EndpointState> deserialize(DataInput dis, int version) throws IOException - { - int size = dis.readInt(); - Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, EndpointState>(size); - - for ( int i = 0; i < size; ++i ) - { - InetAddress ep = CompactEndpointSerializationHelper.deserialize(dis); - EndpointState epState = EndpointState.serializer().deserialize(dis, version); - epStateMap.put(ep, epState); - } - return epStateMap; + int size = DBTypeSizes.NATIVE.sizeof(digests.size()); + for (GossipDigest digest : digests) + size += GossipDigest.serializer().serializedSize(digest, version); + return size; } } @@ -129,9 +111,9 @@ class GossipDigestSynSerializer implements IVersionedSerializer<GossipDigestSyn> return new GossipDigestSyn(clusterId, gDigests); } - public long serializedSize(GossipDigestSyn gossipDigestSynMessage, int version) + public long serializedSize(GossipDigestSyn syn, int version) { - throw new UnsupportedOperationException(); + return FBUtilities.serializedUTF8Size(syn.clusterId) + GossipDigestSerializationHelper.serializedSize(syn.gDigests, version); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java b/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java index 3122986..d3cdaf1 100644 --- a/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java +++ b/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java @@ -58,6 +58,6 @@ class GossipShutdownMessageSerializer implements IVersionedSerializer<GossipShut public long serializedSize(GossipShutdownMessage gossipShutdownMessage, int version) { - throw new UnsupportedOperationException(); + return 0; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/HeartBeatState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java b/src/java/org/apache/cassandra/gms/HeartBeatState.java index 294d460..bdbb6a3 100644 --- a/src/java/org/apache/cassandra/gms/HeartBeatState.java +++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java @@ -19,6 +19,7 @@ package org.apache.cassandra.gms; import java.io.*; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.io.IVersionedSerializer; @@ -88,8 +89,8 @@ class HeartBeatStateSerializer implements IVersionedSerializer<HeartBeatState> return new HeartBeatState(dis.readInt(), dis.readInt()); } - public long serializedSize(HeartBeatState heartBeatState, int version) + public long serializedSize(HeartBeatState state, int version) { - throw new UnsupportedOperationException(); + return DBTypeSizes.NATIVE.sizeof(state.getGeneration()) + DBTypeSizes.NATIVE.sizeof(state.getHeartBeatVersion()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 10dbb6a..807eb88 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -21,6 +21,7 @@ import java.io.*; import java.net.InetAddress; import java.util.UUID; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; @@ -199,6 +200,12 @@ public class VersionedValue implements Comparable<VersionedValue> { public void serialize(VersionedValue value, DataOutput dos, int version) throws IOException { + dos.writeUTF(outValue(value, version)); + dos.writeInt(value.version); + } + + private String outValue(VersionedValue value, int version) + { String outValue = value.value; if (version < MessagingService.VERSION_12) @@ -206,20 +213,17 @@ public class VersionedValue implements Comparable<VersionedValue> String[] pieces = value.value.split(DELIMITER_STR, -1); String type = pieces[0]; - if ((type == STATUS_NORMAL) || type == STATUS_BOOTSTRAPPING) + if ((type.equals(STATUS_NORMAL)) || type.equals(STATUS_BOOTSTRAPPING)) { assert pieces.length >= 3; outValue = versionString(pieces[0], pieces[2]); } - if ((type == REMOVAL_COORDINATOR) || (type == REMOVING_TOKEN) || (type == REMOVED_TOKEN)) + if ((type.equals(REMOVAL_COORDINATOR)) || (type.equals(REMOVING_TOKEN)) || (type.equals(REMOVED_TOKEN))) throw new RuntimeException(String.format("Unable to serialize %s(%s...) for nodes older than 1.2", - VersionedValue.class.getName(), - type)); + VersionedValue.class.getName(), type)); } - - dos.writeUTF(outValue); - dos.writeInt(value.version); + return outValue; } public VersionedValue deserialize(DataInput dis, int version) throws IOException @@ -231,7 +235,8 @@ public class VersionedValue implements Comparable<VersionedValue> public long serializedSize(VersionedValue value, int version) { - throw new UnsupportedOperationException(); + int outLength = FBUtilities.encodedUTF8Length(outValue(value, version)); + return DBTypeSizes.NATIVE.sizeof(outLength) + outLength + DBTypeSizes.NATIVE.sizeof(value.version); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 3e88984..7a9b0d7 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -40,7 +40,6 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; -import org.apache.cassandra.db.DBConstants; import org.apache.cassandra.gms.*; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.MessageOut; @@ -328,11 +327,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber public long serializedSize(Collection<RowMutation> schema, int version) { - int size = DBConstants.INT_SIZE; + int size = DBTypeSizes.NATIVE.sizeof(schema.size()); for (RowMutation rm : schema) size += RowMutation.serializer().serializedSize(rm, version); return size; } } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/streaming/PendingFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/PendingFile.java b/src/java/org/apache/cassandra/streaming/PendingFile.java index 1554f85..41ccf46 100644 --- a/src/java/org/apache/cassandra/streaming/PendingFile.java +++ b/src/java/org/apache/cassandra/streaming/PendingFile.java @@ -21,10 +21,12 @@ import java.io.*; import java.util.ArrayList; import java.util.List; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; /** @@ -147,9 +149,21 @@ public class PendingFile return new PendingFile(null, desc, component, sections, type, estimatedKeys); } - public long serializedSize(PendingFile pendingFile, int version) + public long serializedSize(PendingFile pf, int version) { - throw new UnsupportedOperationException(); + if (pf == null) + return DBTypeSizes.NATIVE.sizeof(0); + + long size = FBUtilities.serializedUTF8Size(pf.desc.filenameFor(pf.component)); + size += FBUtilities.serializedUTF8Size(pf.component); + size += DBTypeSizes.NATIVE.sizeof(pf.sections.size()); + for (Pair<Long,Long> section : pf.sections) + size += DBTypeSizes.NATIVE.sizeof(section.left + DBTypeSizes.NATIVE.sizeof(section.right)); + if (version > MessagingService.VERSION_07) + size += FBUtilities.serializedUTF8Size(pf.type.name()); + if (version > MessagingService.VERSION_080) + size += DBTypeSizes.NATIVE.sizeof(pf.estimatedKeys); + return size; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/streaming/StreamHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamHeader.java b/src/java/org/apache/cassandra/streaming/StreamHeader.java index 00d0049..b98aab5 100644 --- a/src/java/org/apache/cassandra/streaming/StreamHeader.java +++ b/src/java/org/apache/cassandra/streaming/StreamHeader.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; @@ -110,9 +111,16 @@ public class StreamHeader return new StreamHeader(table, sessionId, file, pendingFiles, bca); } - public long serializedSize(StreamHeader streamHeader, int version) + public long serializedSize(StreamHeader sh, int version) { - throw new UnsupportedOperationException(); - } + long size = FBUtilities.serializedUTF8Size(sh.table); + size += DBTypeSizes.NATIVE.sizeof(sh.sessionId); + size += PendingFile.serializer().serializedSize(sh.file, version); + size += DBTypeSizes.NATIVE.sizeof(sh.pendingFiles.size()); + for(PendingFile file : sh.pendingFiles) + size += PendingFile.serializer().serializedSize(file, version); + size += CompactEndpointSerializationHelper.serializedSize(sh.broadcastAddress); + return size; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/streaming/StreamReply.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java index d207d6e..55ded17 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReply.java +++ b/src/java/org/apache/cassandra/streaming/StreamReply.java @@ -21,9 +21,11 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; public class StreamReply { @@ -80,9 +82,9 @@ public class StreamReply return new StreamReply(targetFile, sessionId, action); } - public long serializedSize(StreamReply streamReply, int version) + public long serializedSize(StreamReply reply, int version) { - throw new UnsupportedOperationException(); + return DBTypeSizes.NATIVE.sizeof(reply.sessionId) + FBUtilities.serializedUTF8Size(reply.file) + DBTypeSizes.NATIVE.sizeof(reply.action.ordinal()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/streaming/StreamRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java index 3814049..fa1d864 100644 --- a/src/java/org/apache/cassandra/streaming/StreamRequest.java +++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java @@ -28,6 +28,7 @@ import java.util.List; import com.google.common.collect.Iterables; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; @@ -36,6 +37,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; /** * This class encapsulates the message that needs to be sent to nodes @@ -138,9 +140,7 @@ public class StreamRequest dos.writeUTF(srm.table); dos.writeInt(srm.ranges.size()); for (Range<Token> range : srm.ranges) - { AbstractBounds.serializer().serialize(range, dos, version); - } if (version > MessagingService.VERSION_07) dos.writeUTF(srm.type.name()); @@ -170,9 +170,7 @@ public class StreamRequest int size = dis.readInt(); List<Range<Token>> ranges = (size == 0) ? null : new ArrayList<Range<Token>>(size); for( int i = 0; i < size; ++i ) - { ranges.add((Range<Token>) AbstractBounds.serializer().deserialize(dis, version).toTokenBounds()); - } OperationType type = OperationType.RESTORE_REPLICA_COUNT; if (version > MessagingService.VERSION_07) type = OperationType.valueOf(dis.readUTF()); @@ -189,9 +187,27 @@ public class StreamRequest } } - public long serializedSize(StreamRequest streamRequestMessage, int version) + public long serializedSize(StreamRequest sr, int version) { - throw new UnsupportedOperationException(); + long size = DBTypeSizes.NATIVE.sizeof(sr.sessionId); + size += CompactEndpointSerializationHelper.serializedSize(sr.target); + size += DBTypeSizes.NATIVE.sizeof(true); + if (sr.file != null) + return size + PendingFile.serializer().serializedSize(sr.file, version); + + size += FBUtilities.serializedUTF8Size(sr.table); + size += DBTypeSizes.NATIVE.sizeof(sr.ranges.size()); + for (Range<Token> range : sr.ranges) + size += AbstractBounds.serializer().serializedSize(range, version); + if (version > MessagingService.VERSION_07) + size += FBUtilities.serializedUTF8Size(sr.type.name()); + if (version > MessagingService.VERSION_080) + { + size += DBTypeSizes.NATIVE.sizeof(Iterables.size(sr.columnFamilies)); + for (ColumnFamilyStore cfs : sr.columnFamilies) + size += DBTypeSizes.NATIVE.sizeof(cfs.metadata.cfId); + } + return size; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java index 3074bc0..947de9b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; @@ -38,6 +39,7 @@ import org.apache.cassandra.net.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; + /** * Task that make two nodes exchange (stream) some ranges (for a given table/cf). * This handle the case where the local node is neither of the two nodes that @@ -265,7 +267,14 @@ public class StreamingRepairTask implements Runnable public long serializedSize(StreamingRepairTask task, int version) { - throw new UnsupportedOperationException(); + long size = UUIDGen.serializer.serializedSize(task.id, version); + size += 3 * CompactEndpointSerializationHelper.serializedSize(task.owner); + size += FBUtilities.serializedUTF8Size(task.tableName); + size += FBUtilities.serializedUTF8Size(task.cfName); + size += DBTypeSizes.NATIVE.sizeof(task.ranges.size()); + for (Range<Token> range : task.ranges) + size += AbstractBounds.serializer().serializedSize(range, version); + return size; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index ac55d08..8b5e2f3 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -42,6 +42,7 @@ import org.apache.cassandra.cache.IRowCacheProvider; import org.apache.cassandra.concurrent.CreationTimeAwareFuture; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -585,6 +586,12 @@ public class FBUtilities } } + public static int serializedUTF8Size(String st) + { + int length = encodedUTF8Length(st); + return DBTypeSizes.NATIVE.sizeof(length) + length; + } + private static final class WrappedCloseableIterator<T> extends AbstractIterator<T> implements CloseableIterator<T> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/utils/MerkleTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java index 9b56d76..2cbecb8 100644 --- a/src/java/org/apache/cassandra/utils/MerkleTree.java +++ b/src/java/org/apache/cassandra/utils/MerkleTree.java @@ -26,7 +26,7 @@ import java.util.*; import com.google.common.collect.AbstractIterator; import com.google.common.collect.PeekingIterator; -import org.apache.cassandra.db.DBConstants; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -103,9 +103,12 @@ public class MerkleTree implements Serializable return mt; } - public long serializedSize(MerkleTree merkleTree, int version) + public long serializedSize(MerkleTree mt, int version) { - return 1 + DBConstants.LONG_SIZE + DBConstants.LONG_SIZE + Hashable.serializer.serializedSize(merkleTree.root, version); + return DBTypeSizes.NATIVE.sizeof(mt.hashdepth) + + DBTypeSizes.NATIVE.sizeof(mt.maxsize) + + DBTypeSizes.NATIVE.sizeof(mt.size) + + Hashable.serializer.serializedSize(mt.root, version); } } @@ -711,8 +714,11 @@ public class MerkleTree implements Serializable public long serializedSize(Inner inner, int version) { - int size = inner.hash == null ? DBConstants.INT_SIZE : DBConstants.INT_SIZE + inner.hash.length; - size += Token.serializer().serializedSize(inner.token) + int size = inner.hash == null + ? DBTypeSizes.NATIVE.sizeof(-1) + : DBTypeSizes.NATIVE.sizeof(inner.hash().length) + inner.hash().length; + + size += Token.serializer().serializedSize(inner.token, DBTypeSizes.NATIVE) + Hashable.serializer.serializedSize(inner.lchild, version) + Hashable.serializer.serializedSize(inner.rchild, version); return size; @@ -790,7 +796,9 @@ public class MerkleTree implements Serializable public long serializedSize(Leaf leaf, int version) { - return leaf.hash == null ? DBConstants.INT_SIZE : DBConstants.INT_SIZE + leaf.hash.length; + return leaf.hash == null + ? DBTypeSizes.NATIVE.sizeof(-1) + : DBTypeSizes.NATIVE.sizeof(leaf.hash().length) + leaf.hash().length; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b9fc26c/src/java/org/apache/cassandra/utils/UUIDGen.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java index 19f2e9b..b83d16d 100644 --- a/src/java/org/apache/cassandra/utils/UUIDGen.java +++ b/src/java/org/apache/cassandra/utils/UUIDGen.java @@ -27,7 +27,7 @@ import java.util.Map; import java.util.Random; import java.util.UUID; -import org.apache.cassandra.db.DBConstants; +import org.apache.cassandra.db.DBTypeSizes; import org.apache.cassandra.io.IVersionedSerializer; /** @@ -110,7 +110,7 @@ public class UUIDGen public long serializedSize(UUID uuid, int version) { - return DBConstants.LONG_SIZE + DBConstants.LONG_SIZE; + return DBTypeSizes.NATIVE.sizeof(uuid.getMostSignificantBits()) + DBTypeSizes.NATIVE.sizeof(uuid.getLeastSignificantBits()); } }