Github user jasobrown commented on a diff in the pull request: https://github.com/apache/cassandra/pull/184#discussion_r160523743 --- Diff: src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java --- @@ -21,28 +21,142 @@ import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; +import java.nio.ByteBuffer; -public class CompactEndpointSerializationHelper +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.messages.StreamMessage; + +/* + * As of version 4.0 the endpoint description includes a port number as an unsigned short + */ +public class CompactEndpointSerializationHelper implements IVersionedSerializer<InetAddressAndPort> { - public static void serialize(InetAddress endpoint, DataOutput out) throws IOException + public static final IVersionedSerializer<InetAddressAndPort> instance = new CompactEndpointSerializationHelper(); + + /** + * Streaming uses it's own version numbering so we need to map those versions to the versions used my regular messaging. + * There are only two variants of the serialization currently so a simple mapping around pre vs post 4.0 is fine. + */ + public static final IVersionedSerializer<InetAddressAndPort> streamingInstance = new IVersionedSerializer<InetAddressAndPort>() { - byte[] buf = endpoint.getAddress(); - out.writeByte(buf.length); - out.write(buf); + public void serialize(InetAddressAndPort inetAddressAndPort, DataOutputPlus out, int version) throws IOException + { + if (version < StreamMessage.VERSION_40) + { + instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_30); + } + else + { + instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_40); + } + } + + public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException + { + if (version < StreamMessage.VERSION_40) + { + return instance.deserialize(in, MessagingService.VERSION_30); + } + else + { + return instance.deserialize(in, MessagingService.VERSION_40); + } + } + + public long serializedSize(InetAddressAndPort inetAddressAndPort, int version) + { + if (version < StreamMessage.VERSION_40) + { + return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_30); + } + else + { + return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_40); + } + } + }; + + private CompactEndpointSerializationHelper() {} + + public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException + { + if (version >= MessagingService.VERSION_40) + { + byte[] buf = endpoint.address.getAddress(); + out.writeByte(buf.length + 2); + out.write(buf); + out.writeShort(endpoint.port); + } + else + { + byte[] buf = endpoint.address.getAddress(); + out.writeByte(buf.length); + out.write(buf); + } } - public static InetAddress deserialize(DataInput in) throws IOException + public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException { - byte[] bytes = new byte[in.readByte()]; - in.readFully(bytes, 0, bytes.length); - return InetAddress.getByAddress(bytes); + int size = in.readByte() & 0xFF; + switch(size) + { + //The original pre-4.0 serialiation of just an address + case 4: + case 16: + { + byte[] bytes = new byte[size]; --- End diff -- similar to my comment in the `serialize()` method, it would be cool if we can avoid allocating the byte array (on the heap).
--- --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org