Repository: impala Updated Branches: refs/heads/master d428c16f1 -> 7e368b8f0
IMPALA-5987: LZ4 Codec silently produces bogus compressed data for large inputs When Lz4Compressor::MaxOutputLen returns 0, it means that the input is too large to compress. When invoked Lz4Compressor::ProcessBlock with an input too large, it silently produced a bogus result. This bogus result even decompresses successfully, but not to the data that was originally compressed. After this commit, Lz4Compressor::ProcessBlock will return error if it cannot compress the input. I also added a comment on Codec::MaxOutputLen() that return value 0 means that the input is too large. I added some checks after the invocations of MaxOutputLen() where the compressor can be a Lz4Compressor. I added an automated test case to be/src/util/decompress-test.cc. Change-Id: Ifb0bc4ed98c5d7b628b791aa90ead36347b9fbb8 Reviewed-on: http://gerrit.cloudera.org:8080/8748 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/7e368b8f Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/7e368b8f Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/7e368b8f Branch: refs/heads/master Commit: 7e368b8f0f6255d338beb127ff6a3c4e11a28c9e Parents: d428c16 Author: Zoltan Borok-Nagy <borokna...@cloudera.com> Authored: Mon Dec 4 18:05:25 2017 +0100 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Mon Dec 4 21:43:00 2017 +0000 ---------------------------------------------------------------------- .../benchmarks/row-batch-serialize-benchmark.cc | 1 + be/src/catalog/catalog-util.cc | 1 + be/src/runtime/row-batch.cc | 1 + be/src/util/codec.h | 1 + be/src/util/compress.cc | 3 +++ be/src/util/decompress-test.cc | 26 ++++++++++++++++++++ be/src/util/runtime-profile.cc | 4 ++- common/thrift/generate_error_codes.py | 3 +++ 8 files changed, 39 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/7e368b8f/be/src/benchmarks/row-batch-serialize-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc index 699b91b..a2071ca 100644 --- a/be/src/benchmarks/row-batch-serialize-benchmark.cc +++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc @@ -124,6 +124,7 @@ class RowBatchSerializeBaseline { MakeScopeExitTrigger([&compressor]() { compressor.Close(); }); int64_t compressed_size = compressor.MaxOutputLen(size); + DCHECK_GT(compressed_size, 0); if (batch->compression_scratch_.size() < compressed_size) { batch->compression_scratch_.resize(compressed_size); } http://git-wip-us.apache.org/repos/asf/impala/blob/7e368b8f/be/src/catalog/catalog-util.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc index de4f2fd..e968742 100644 --- a/be/src/catalog/catalog-util.cc +++ b/be/src/catalog/catalog-util.cc @@ -195,6 +195,7 @@ Status CompressCatalogObject(string* catalog_object) { &compressor)); string output_buffer; int64_t compressed_data_len = compressor->MaxOutputLen(catalog_object->size()); + DCHECK_GT(compressed_data_len, 0); int64_t output_buffer_len = compressed_data_len + sizeof(uint32_t); output_buffer.resize(output_buffer_len); uint8_t* output_buffer_ptr = http://git-wip-us.apache.org/repos/asf/impala/blob/7e368b8f/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index cd8c936..bad1f1c 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -265,6 +265,7 @@ Status RowBatch::Serialize(bool full_dedup, vector<int32_t>* tuple_offsets, MakeScopeExitTrigger([&compressor]() { compressor.Close(); }); int64_t compressed_size = compressor.MaxOutputLen(size); + DCHECK_GT(compressed_size, 0); if (compression_scratch_.size() < compressed_size) { compression_scratch_.resize(compressed_size); } http://git-wip-us.apache.org/repos/asf/impala/blob/7e368b8f/be/src/util/codec.h ---------------------------------------------------------------------- diff --git a/be/src/util/codec.h b/be/src/util/codec.h index d21e399..8d6b88f 100644 --- a/be/src/util/codec.h +++ b/be/src/util/codec.h @@ -149,6 +149,7 @@ class Codec { /// a buffer. /// This must be an O(1) operation (i.e. cannot read all of input). Codecs that /// don't support this should return -1. + /// Return value 0 means error, the input size is too large. virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = nullptr) = 0; /// Must be called on codec before destructor for final cleanup. http://git-wip-us.apache.org/repos/asf/impala/blob/7e368b8f/be/src/util/compress.cc ---------------------------------------------------------------------- diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc index 90656aa..7a0181b 100644 --- a/be/src/util/compress.cc +++ b/be/src/util/compress.cc @@ -311,6 +311,9 @@ Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_lengt DCHECK_GE(input_length, 0); CHECK(output_preallocated) << "Output was not allocated for Lz4 Codec"; if (input_length == 0) return Status::OK(); + if (MaxOutputLen(input_length, input) == 0) { + return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, input_length); + } *output_length = LZ4_compress_default(reinterpret_cast<const char*>(input), reinterpret_cast<char*>(*output), input_length, *output_length); return Status::OK(); http://git-wip-us.apache.org/repos/asf/impala/blob/7e368b8f/be/src/util/decompress-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc index d170501..2a2299c 100644 --- a/be/src/util/decompress-test.cc +++ b/be/src/util/decompress-test.cc @@ -251,6 +251,7 @@ class DecompressorTest : public ::testing::Test { Codec* decompressor, int64_t input_len, uint8_t* input) { // Preallocated output buffers for compressor int64_t max_compressed_length = compressor->MaxOutputLen(input_len, input); + ASSERT_GT(max_compressed_length, 0); uint8_t* compressed = mem_pool_.Allocate(max_compressed_length); int64_t compressed_length = max_compressed_length; @@ -441,6 +442,31 @@ TEST_F(DecompressorTest, Impala5250) { EXPECT_EQ(output_length, 0); } +TEST_F(DecompressorTest, LZ4Huge) { + // IMPALA-5987: When Lz4Compressor::MaxOutputLen() returns 0, + // it means that the input is too large to compress, therefore trying + // to compress it should fail. + + // Generate a big random payload. + int payload_len = numeric_limits<int>::max(); + uint8_t* payload = new uint8_t[payload_len]; + for (int i = 0 ; i < payload_len; ++i) payload[i] = rand(); + + scoped_ptr<Codec> compressor; + EXPECT_OK(Codec::CreateCompressor(nullptr, true, impala::THdfsCompression::LZ4, + &compressor)); + + // The returned max_size is 0 because the payload is too big. + int64_t max_size = compressor->MaxOutputLen(payload_len); + ASSERT_EQ(max_size, 0); + + // Trying to compress it should give an error + int64_t compressed_len = max_size; + uint8_t* compressed = new uint8_t[max_size]; + EXPECT_ERROR(compressor->ProcessBlock(true, payload_len, payload, + &compressed_len, &compressed), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE); +} + } int main(int argc, char **argv) { http://git-wip-us.apache.org/repos/asf/impala/blob/7e368b8f/be/src/util/runtime-profile.cc ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc index 4254b9c..abb691d 100644 --- a/be/src/util/runtime-profile.cc +++ b/be/src/util/runtime-profile.cc @@ -758,7 +758,9 @@ Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const { MakeScopeExitTrigger([&compressor]() { compressor->Close(); }); vector<uint8_t> compressed_buffer; - compressed_buffer.resize(compressor->MaxOutputLen(serialized_buffer.size())); + int64_t max_compressed_size = compressor->MaxOutputLen(serialized_buffer.size()); + DCHECK_GT(max_compressed_size, 0); + compressed_buffer.resize(max_compressed_size); int64_t result_len = compressed_buffer.size(); uint8_t* compressed_buffer_ptr = compressed_buffer.data(); RETURN_IF_ERROR(compressor->ProcessBlock(true, serialized_buffer.size(), http://git-wip-us.apache.org/repos/asf/impala/blob/7e368b8f/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index a108bb7..bf1953e 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -341,6 +341,9 @@ error_codes = ( ("BAD_PRINCIPAL_FORMAT", 112, "Kerberos principal should be of the form: <service>/<hostname>@<realm> - got: $0"), + + ("LZ4_COMPRESSION_INPUT_TOO_LARGE", 113, + "The input size is too large for LZ4 compression: $0"), ) import sys