Github user aweisberg commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/184#discussion_r163084819
--- Diff:
src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java ---
@@ -21,28 +21,142 @@
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
+import java.nio.ByteBuffer;
-public class CompactEndpointSerializationHelper
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+/*
+ * As of version 4.0 the endpoint description includes a port number as an
unsigned short
+ */
+public class CompactEndpointSerializationHelper implements
IVersionedSerializer<InetAddressAndPort>
{
- public static void serialize(InetAddress endpoint, DataOutput out)
throws IOException
+ public static final IVersionedSerializer<InetAddressAndPort> instance
= new CompactEndpointSerializationHelper();
+
+ /**
+ * Streaming uses it's own version numbering so we need to map those
versions to the versions used my regular messaging.
+ * There are only two variants of the serialization currently so a
simple mapping around pre vs post 4.0 is fine.
+ */
+ public static final IVersionedSerializer<InetAddressAndPort>
streamingInstance = new IVersionedSerializer<InetAddressAndPort>()
{
- byte[] buf = endpoint.getAddress();
- out.writeByte(buf.length);
- out.write(buf);
+ public void serialize(InetAddressAndPort inetAddressAndPort,
DataOutputPlus out, int version) throws IOException
+ {
+ if (version < StreamMessage.VERSION_40)
+ {
+ instance.serialize(inetAddressAndPort, out,
MessagingService.VERSION_30);
+ }
+ else
+ {
+ instance.serialize(inetAddressAndPort, out,
MessagingService.VERSION_40);
+ }
+ }
+
+ public InetAddressAndPort deserialize(DataInputPlus in, int
version) throws IOException
+ {
+ if (version < StreamMessage.VERSION_40)
+ {
+ return instance.deserialize(in,
MessagingService.VERSION_30);
+ }
+ else
+ {
+ return instance.deserialize(in,
MessagingService.VERSION_40);
+ }
+ }
+
+ public long serializedSize(InetAddressAndPort inetAddressAndPort,
int version)
+ {
+ if (version < StreamMessage.VERSION_40)
+ {
+ return instance.serializedSize(inetAddressAndPort,
MessagingService.VERSION_30);
+ }
+ else
+ {
+ return instance.serializedSize(inetAddressAndPort,
MessagingService.VERSION_40);
+ }
+ }
+ };
+
+ private CompactEndpointSerializationHelper() {}
+
+ public void serialize(InetAddressAndPort endpoint, DataOutputPlus out,
int version) throws IOException
+ {
+ if (version >= MessagingService.VERSION_40)
+ {
+ byte[] buf = endpoint.address.getAddress();
+ out.writeByte(buf.length + 2);
+ out.write(buf);
+ out.writeShort(endpoint.port);
+ }
+ else
+ {
+ byte[] buf = endpoint.address.getAddress();
+ out.writeByte(buf.length);
+ out.write(buf);
+ }
}
- public static InetAddress deserialize(DataInput in) throws IOException
+ public InetAddressAndPort deserialize(DataInputPlus in, int version)
throws IOException
{
- byte[] bytes = new byte[in.readByte()];
- in.readFully(bytes, 0, bytes.length);
- return InetAddress.getByAddress(bytes);
+ int size = in.readByte() & 0xFF;
+ switch(size)
+ {
+ //The original pre-4.0 serialiation of just an address
+ case 4:
+ case 16:
+ {
+ byte[] bytes = new byte[size];
+ in.readFully(bytes, 0, bytes.length);
+ return InetAddressAndPort.getByAddress(bytes);
+ }
+ //Address and one port
+ case 6:
+ case 18:
+ {
+ byte[] bytes = new byte[size - 2];
+ in.readFully(bytes);
+
+ int port = in.readShort() & 0xFFFF;
+ return
InetAddressAndPort.getByAddressOverrideDefaults(InetAddress.getByAddress(bytes),
port);
+ }
+ default:
+ throw new AssertionError("Unexpected size " + size);
+
+ }
}
- public static int serializedSize(InetAddress from)
+ public long serializedSize(InetAddressAndPort from, int version)
{
- if (from instanceof Inet4Address)
- return 1 + 4;
- assert from instanceof Inet6Address;
- return 1 + 16;
+ //4.0 includes a port number
+ if (version >= MessagingService.VERSION_40)
+ {
+ if (from.address instanceof Inet4Address)
+ return 1 + 4 + 2;
+ assert from.address instanceof Inet6Address;
+ return 1 + 16 + 2;
+ }
+ else
+ {
+ if (from.address instanceof Inet4Address)
+ return 1 + 4;
+ assert from.address instanceof Inet6Address;
+ return 1 + 16;
+ }
}
+
+ public static InetAddressAndPort fromBytes(ByteBuffer buffer, int
version)
--- End diff --
You are right. Removed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]