Repository: kudu Updated Branches: refs/heads/master 1ae56048b -> 2b0c1c019
KUDU-2305: Fix local variable usage to handle 2GB messages The maximum value for rpc_max_message_size (an int32_t) is INT_MAX. However, when using this maximum value, certain local variables in SerializeMessage() and InboundTransfer can overflow if the message size approaches INT_MAX. This changes certain variables to handle messages that approach INT_MAX. Specifically: - Local variables in SerializeMessage() become int64_t rather than int. - The total message size prefixed to the message is now treated as a uint32_t everywhere. This impacts InboundTransfer::total_length_/cur_offset_ and a local variable in ParseMessage(). These changes mean that a sender can serialize a message that is slightly larger than INT_MAX due to the added header size, but the receiver will reject all messages exceeding rpc_max_message_size (maximum INT_MAX). For testing, this modifies rpc-test's TestRpcSidecarLimits to send a message of size INT_MAX rather than rpc_max_message_size+1. This increases the test runtime from about 50ms to 2 seconds. Change-Id: I8e468099b16f7eb55de71b753acae8f57d18287c Reviewed-on: http://gerrit.cloudera.org:8080/9355 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <t...@apache.org> Reviewed-by: Dan Burkert <d...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2b0c1c01 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2b0c1c01 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2b0c1c01 Branch: refs/heads/master Commit: 2b0c1c019921e485f06c4be280fedba3d5279672 Parents: 1ae5604 Author: Joe McDonnell <joemcdonn...@cloudera.com> Authored: Fri Feb 16 15:00:37 2018 -0800 Committer: Dan Burkert <d...@cloudera.com> Committed: Thu Feb 22 20:52:27 2018 +0000 ---------------------------------------------------------------------- src/kudu/rpc/rpc-test.cc | 6 +++++- src/kudu/rpc/serialization.cc | 13 ++++++++----- src/kudu/rpc/transfer.cc | 5 ++++- src/kudu/rpc/transfer.h | 4 ++-- 4 files changed, 19 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/2b0c1c01/src/kudu/rpc/rpc-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index b6e563e..94a22cf 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -21,6 +21,7 @@ #include <cstdint> #include <cstdlib> #include <cstring> +#include <limits.h> #include <memory> #include <ostream> #include <set> @@ -700,7 +701,10 @@ TEST_P(TestRpc, TestRpcSidecarLimits) { GenericCalculatorService::static_service_name()); RpcController controller; - string s(FLAGS_rpc_max_message_size + 1, 'a'); + // KUDU-2305: Test with a maximal payload to verify that the implementation + // can handle the limits. + string s; + s.resize(INT_MAX, 'a'); int idx; CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx)); http://git-wip-us.apache.org/repos/asf/kudu/blob/2b0c1c01/src/kudu/rpc/serialization.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/serialization.cc b/src/kudu/rpc/serialization.cc index b6d6393..438ad18 100644 --- a/src/kudu/rpc/serialization.cc +++ b/src/kudu/rpc/serialization.cc @@ -56,9 +56,12 @@ void SerializeMessage(const MessageLite& message, faststring* param_buf, int additional_size, bool use_cached_size) { int pb_size = use_cached_size ? message.GetCachedSize() : message.ByteSize(); DCHECK_EQ(message.ByteSize(), pb_size); - int recorded_size = pb_size + additional_size; - int size_with_delim = pb_size + CodedOutputStream::VarintSize32(recorded_size); - int total_size = size_with_delim + additional_size; + // Use 8-byte integers to avoid overflowing when additional_size approaches INT_MAX. + int64_t recorded_size = static_cast<int64_t>(pb_size) + + static_cast<int64_t>(additional_size); + int64_t size_with_delim = static_cast<int64_t>(pb_size) + + static_cast<int64_t>(CodedOutputStream::VarintSize32(recorded_size)); + int64_t total_size = size_with_delim + static_cast<int64_t>(additional_size); if (total_size > FLAGS_rpc_max_message_size) { LOG(WARNING) << Substitute("Serialized $0 ($1 bytes) is larger than the maximum configured " @@ -114,8 +117,8 @@ Status ParseMessage(const Slice& buf, KUDU_REDACT(buf.ToDebugString())); } - int total_len = NetworkByteOrder::Load32(buf.data()); - DCHECK_EQ(total_len + kMsgLengthPrefixLength, buf.size()) + uint32_t total_len = NetworkByteOrder::Load32(buf.data()); + DCHECK_EQ(total_len, buf.size() - kMsgLengthPrefixLength) << "Got mis-sized buffer: " << KUDU_REDACT(buf.ToDebugString()); CodedInputStream in(buf.data(), buf.size()); http://git-wip-us.apache.org/repos/asf/kudu/blob/2b0c1c01/src/kudu/rpc/transfer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc index 67d845c..ee83d55 100644 --- a/src/kudu/rpc/transfer.cc +++ b/src/kudu/rpc/transfer.cc @@ -77,7 +77,7 @@ InboundTransfer::InboundTransfer() Status InboundTransfer::ReceiveBuffer(Socket &socket) { if (cur_offset_ < kMsgLengthPrefixLength) { - // receive int32 length prefix + // receive uint32 length prefix int32_t rem = kMsgLengthPrefixLength - cur_offset_; int32_t nread; Status status = socket.Recv(&buf_[cur_offset_], rem, &nread); @@ -116,6 +116,9 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) { // receive message body int32_t nread; + + // If total_length_ > INT_MAX, then it would exceed the maximum rpc_max_message_size + // and exit above. Hence, it is safe to use int32_t here. int32_t rem = total_length_ - cur_offset_; Status status = socket.Recv(&buf_[cur_offset_], rem, &nread); RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); http://git-wip-us.apache.org/repos/asf/kudu/blob/2b0c1c01/src/kudu/rpc/transfer.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h index 9561f84..53f4c90 100644 --- a/src/kudu/rpc/transfer.h +++ b/src/kudu/rpc/transfer.h @@ -89,8 +89,8 @@ class InboundTransfer { faststring buf_; - int32_t total_length_; - int32_t cur_offset_; + uint32_t total_length_; + uint32_t cur_offset_; DISALLOW_COPY_AND_ASSIGN(InboundTransfer); };