Repository: impala Updated Branches: refs/heads/master ad7cbc94e -> 1765a44da
KUDU-2305: Fix local variable usage to handle 2GB messages The maximum value for rpc_max_message_size (an int32_t) is INT_MAX. However, when using this maximum value, certain local variables in SerializeMessage() and InboundTransfer can overflow if the message size approaches INT_MAX. This changes certain variables to handle messages that approach INT_MAX. Specifically: - Local variables in SerializeMessage() become int64_t rather than int. - The total message size prefixed to the message is now treated as a uint32_t everywhere. This impacts InboundTransfer::total_length_/cur_offset_ and a local variable in ParseMessage(). These changes mean that a sender can serialize a message that is slightly larger than INT_MAX due to the added header size, but the receiver will reject all messages exceeding rpc_max_message_size (maximum INT_MAX). For testing, this modifies rpc-test's TestRpcSidecarLimits to send a message of size INT_MAX rather than rpc_max_message_size+1. This increases the test runtime from about 50ms to 2 seconds. Change-Id: I8e468099b16f7eb55de71b753acae8f57d18287c Reviewed-on: http://gerrit.cloudera.org:8080/9355 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <t...@apache.org> Reviewed-by: Dan Burkert <d...@cloudera.com> Reviewed-on: http://gerrit.cloudera.org:8080/9407 Reviewed-by: Michael Ho <k...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/152cf8b5 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/152cf8b5 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/152cf8b5 Branch: refs/heads/master Commit: 152cf8b54d155cd4699b96f6fa83190f6ac9e9aa Parents: ad7cbc9 Author: Joe McDonnell <joemcdonn...@cloudera.com> Authored: Thu Feb 22 14:53:35 2018 -0800 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Fri Feb 23 06:58:40 2018 +0000 ---------------------------------------------------------------------- be/src/kudu/rpc/rpc-test.cc | 6 +++++- be/src/kudu/rpc/serialization.cc | 13 ++++++++----- be/src/kudu/rpc/transfer.cc | 5 ++++- be/src/kudu/rpc/transfer.h | 4 ++-- 4 files changed, 19 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/152cf8b5/be/src/kudu/rpc/rpc-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc index 6d9d156..f76cc8f 100644 --- a/be/src/kudu/rpc/rpc-test.cc +++ b/be/src/kudu/rpc/rpc-test.cc @@ -17,6 +17,7 @@ #include "kudu/rpc/rpc-test-base.h" +#include <limits.h> #include <memory> #include <string> #include <unordered_map> @@ -613,7 +614,10 @@ TEST_P(TestRpc, TestRpcSidecarLimits) { GenericCalculatorService::static_service_name()); RpcController controller; - string s(FLAGS_rpc_max_message_size + 1, 'a'); + // KUDU-2305: Test with a maximal payload to verify that the implementation + // can handle the limits. + string s; + s.resize(INT_MAX, 'a'); int idx; CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx)); http://git-wip-us.apache.org/repos/asf/impala/blob/152cf8b5/be/src/kudu/rpc/serialization.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/serialization.cc b/be/src/kudu/rpc/serialization.cc index fbc05bc..c31e79b 100644 --- a/be/src/kudu/rpc/serialization.cc +++ b/be/src/kudu/rpc/serialization.cc @@ -51,9 +51,12 @@ void SerializeMessage(const MessageLite& message, faststring* param_buf, int additional_size, bool use_cached_size) { int pb_size = use_cached_size ? message.GetCachedSize() : message.ByteSize(); DCHECK_EQ(message.ByteSize(), pb_size); - int recorded_size = pb_size + additional_size; - int size_with_delim = pb_size + CodedOutputStream::VarintSize32(recorded_size); - int total_size = size_with_delim + additional_size; + // Use 8-byte integers to avoid overflowing when additional_size approaches INT_MAX. + int64_t recorded_size = static_cast<int64_t>(pb_size) + + static_cast<int64_t>(additional_size); + int64_t size_with_delim = static_cast<int64_t>(pb_size) + + static_cast<int64_t>(CodedOutputStream::VarintSize32(recorded_size)); + int64_t total_size = size_with_delim + static_cast<int64_t>(additional_size); if (total_size > FLAGS_rpc_max_message_size) { LOG(WARNING) << Substitute("Serialized $0 ($1 bytes) is larger than the maximum configured " @@ -109,8 +112,8 @@ Status ParseMessage(const Slice& buf, KUDU_REDACT(buf.ToDebugString())); } - int total_len = NetworkByteOrder::Load32(buf.data()); - DCHECK_EQ(total_len + kMsgLengthPrefixLength, buf.size()) + uint32_t total_len = NetworkByteOrder::Load32(buf.data()); + DCHECK_EQ(total_len, buf.size() - kMsgLengthPrefixLength) << "Got mis-sized buffer: " << KUDU_REDACT(buf.ToDebugString()); CodedInputStream in(buf.data(), buf.size()); http://git-wip-us.apache.org/repos/asf/impala/blob/152cf8b5/be/src/kudu/rpc/transfer.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc index da44355..f83c432 100644 --- a/be/src/kudu/rpc/transfer.cc +++ b/be/src/kudu/rpc/transfer.cc @@ -76,7 +76,7 @@ InboundTransfer::InboundTransfer() Status InboundTransfer::ReceiveBuffer(Socket &socket) { if (cur_offset_ < kMsgLengthPrefixLength) { - // receive int32 length prefix + // receive uint32 length prefix int32_t rem = kMsgLengthPrefixLength - cur_offset_; int32_t nread; Status status = socket.Recv(&buf_[cur_offset_], rem, &nread); @@ -115,6 +115,9 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) { // receive message body int32_t nread; + + // If total_length_ > INT_MAX, then it would exceed the maximum rpc_max_message_size + // and exit above. Hence, it is safe to use int32_t here. int32_t rem = total_length_ - cur_offset_; Status status = socket.Recv(&buf_[cur_offset_], rem, &nread); RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); http://git-wip-us.apache.org/repos/asf/impala/blob/152cf8b5/be/src/kudu/rpc/transfer.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h index 2a2b726..533fe83 100644 --- a/be/src/kudu/rpc/transfer.h +++ b/be/src/kudu/rpc/transfer.h @@ -93,8 +93,8 @@ class InboundTransfer { faststring buf_; - int32_t total_length_; - int32_t cur_offset_; + uint32_t total_length_; + uint32_t cur_offset_; DISALLOW_COPY_AND_ASSIGN(InboundTransfer); };