http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr-stress.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc index ba1ad92..3fd33de 100644 --- a/be/src/runtime/io/disk-io-mgr-stress.cc +++ b/be/src/runtime/io/disk-io-mgr-stress.cc @@ -19,6 +19,8 @@ #include "runtime/io/disk-io-mgr-stress.h" +#include "runtime/bufferpool/reservation-tracker.h" +#include "runtime/exec-env.h" #include "runtime/io/request-context.h" #include "util/time.h" @@ -27,21 +29,20 @@ using namespace impala; using namespace impala::io; -static const float ABORT_CHANCE = .10f; -static const int MIN_READ_LEN = 1; -static const int MAX_READ_LEN = 20; +constexpr float DiskIoMgrStress::ABORT_CHANCE; +const int DiskIoMgrStress::MIN_READ_LEN; +const int DiskIoMgrStress::MAX_READ_LEN; -static const int MIN_FILE_LEN = 10; -static const int MAX_FILE_LEN = 1024; +const int DiskIoMgrStress::MIN_FILE_LEN; +const int DiskIoMgrStress::MAX_FILE_LEN; // Make sure this is between MIN/MAX FILE_LEN to test more cases -static const int MIN_READ_BUFFER_SIZE = 64; -static const int MAX_READ_BUFFER_SIZE = 128; +const int DiskIoMgrStress::MIN_READ_BUFFER_SIZE; +const int DiskIoMgrStress::MAX_READ_BUFFER_SIZE; -// Maximum bytes to allocate per scan range. -static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3; +const int DiskIoMgrStress::MAX_BUFFER_BYTES_PER_SCAN_RANGE; -static const int CANCEL_READER_PERIOD_MS = 20; // in ms +const int DiskIoMgrStress::CANCEL_READER_PERIOD_MS; static void CreateTempFile(const char* filename, const char* data) { FILE* file = fopen(filename, "w"); @@ -50,7 +51,7 @@ static void CreateTempFile(const char* filename, const char* data) { fclose(file); } -string GenerateRandomData() { +string DiskIoMgrStress::GenerateRandomData() { int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN; stringstream ss; for (int i = 0; i < rand_len; ++i) { @@ -62,6 +63,8 @@ string GenerateRandomData() { struct DiskIoMgrStress::Client { boost::mutex lock; + /// Pool for objects that is cleared when the client is (re-)initialized in NewClient(). + ObjectPool obj_pool; unique_ptr<RequestContext> reader; int file_idx; vector<ScanRange*> scan_ranges; @@ -95,6 +98,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk, clients_ = new Client[num_clients_]; client_mem_trackers_.resize(num_clients_); + buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]); for (int i = 0; i < num_clients_; ++i) { NewClient(i); } @@ -119,8 +123,8 @@ void DiskIoMgrStress::ClientThread(int client_id) { CHECK(status.ok() || status.IsCancelled()); if (range == NULL) break; if (needs_buffers) { - status = io_mgr_->AllocateBuffersForRange( - client->reader.get(), range, MAX_BUFFER_BYTES_PER_SCAN_RANGE); + status = io_mgr_->AllocateBuffersForRange(client->reader.get(), + &buffer_pool_clients_[client_id], range, MAX_BUFFER_BYTES_PER_SCAN_RANGE); CHECK(status.ok()) << status.GetDetail(); } @@ -212,7 +216,13 @@ void DiskIoMgrStress::Run(int sec) { } readers_.join_all(); - for (unique_ptr<MemTracker>& mem_tracker : client_mem_trackers_) mem_tracker->Close(); + for (int i = 0; i < num_clients_; ++i) { + if (clients_[i].reader != nullptr) { + io_mgr_->UnregisterContext(clients_[i].reader.get()); + } + ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]); + client_mem_trackers_[i]->Close(); + } mem_tracker_.Close(); } @@ -234,26 +244,41 @@ void DiskIoMgrStress::NewClient(int i) { } } - for (int i = 0; i < client.scan_ranges.size(); ++i) { - delete client.scan_ranges[i]; - } + // Clean up leftover state from the previous client (if any). client.scan_ranges.clear(); + ExecEnv* exec_env = ExecEnv::GetInstance(); + exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]); + if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close(); + client.obj_pool.Clear(); int assigned_len = 0; while (assigned_len < file_len) { int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN; range_len = min(range_len, file_len - assigned_len); - ScanRange* range = new ScanRange(); + ScanRange* range = client.obj_pool.Add(new ScanRange); range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len, 0, false, BufferOpts::Uncached()); client.scan_ranges.push_back(range); assigned_len += range_len; } - if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close(); - client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_)); - client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get()); - Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges); + string client_name = Substitute("Client $0", i); + client_mem_trackers_[i].reset(new MemTracker(-1, client_name, &mem_tracker_)); + Status status = exec_env->buffer_pool()->RegisterClient(client_name, nullptr, + exec_env->buffer_reservation(), client_mem_trackers_[i].get(), + numeric_limits<int64_t>::max(), RuntimeProfile::Create(&client.obj_pool, client_name), + &buffer_pool_clients_[i]); + CHECK(status.ok()); + // Reserve enough memory for 3 buffers per range, which should be enough to guarantee + // progress. + CHECK(buffer_pool_clients_[i].IncreaseReservationToFit( + MAX_BUFFER_BYTES_PER_SCAN_RANGE * client.scan_ranges.size())) + << buffer_pool_clients_[i].DebugString() << "\n" + << exec_env->buffer_pool()->DebugString() << "\n" + << exec_env->buffer_reservation()->DebugString(); + + client.reader = io_mgr_->RegisterContext(); + status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges); CHECK(status.ok()); }
http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr-stress.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h index b872694..574b58c 100644 --- a/be/src/runtime/io/disk-io-mgr-stress.h +++ b/be/src/runtime/io/disk-io-mgr-stress.h @@ -22,8 +22,11 @@ #include <memory> #include <vector> #include <boost/scoped_ptr.hpp> +#include <boost/thread/condition_variable.hpp> #include <boost/thread/thread.hpp> +#include "common/object-pool.h" +#include "runtime/bufferpool/buffer-pool.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/mem-tracker.h" #include "runtime/thread-resource-mgr.h" @@ -43,15 +46,29 @@ class DiskIoMgrStress { /// Run the test for 'sec'. If 0, run forever void Run(int sec); + static constexpr float ABORT_CHANCE = .10f; + static const int MIN_READ_LEN = 1; + static const int MAX_READ_LEN = 20; + + static const int MIN_FILE_LEN = 10; + static const int MAX_FILE_LEN = 1024; + + // Make sure this is between MIN/MAX FILE_LEN to test more cases + static const int MIN_READ_BUFFER_SIZE = 64; + static const int MAX_READ_BUFFER_SIZE = 128; + + // Maximum bytes to allocate per scan range. + static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3; + + static const int CANCEL_READER_PERIOD_MS = 20; private: struct Client; struct File { std::string filename; - std::string data; // the data in the file, used to validate + std::string data; // the data in the file, used to validate }; - /// Files used for testing. These are created at startup and recycled /// during the test std::vector<File> files_; @@ -72,6 +89,9 @@ class DiskIoMgrStress { /// Client MemTrackers, one per client. std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_; + /// Buffer pool clients, one per client. + std::unique_ptr<BufferPool::ClientHandle[]> buffer_pool_clients_; + /// If true, tests cancelling readers bool includes_cancellation_; @@ -88,6 +108,8 @@ class DiskIoMgrStress { /// Possibly cancels a random reader. void CancelRandomReader(); + + static std::string GenerateRandomData(); }; } } http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc index 95ea184..3a0f727 100644 --- a/be/src/runtime/io/disk-io-mgr-test.cc +++ b/be/src/runtime/io/disk-io-mgr-test.cc @@ -22,14 +22,16 @@ #include "codegen/llvm-codegen.h" #include "common/init.h" +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/bufferpool/reservation-tracker.h" #include "runtime/io/disk-io-mgr-stress.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/io/request-context.h" -#include "runtime/mem-tracker.h" #include "runtime/test-env.h" #include "runtime/thread-resource-mgr.h" #include "service/fe-support.h" #include "testutil/gtest-util.h" +#include "testutil/rand-util.h" #include "util/condition-variable.h" #include "util/cpu-info.h" #include "util/disk-info.h" @@ -37,13 +39,20 @@ #include "common/names.h" +using std::mt19937; +using std::uniform_int_distribution; +using std::uniform_real_distribution; + +DECLARE_int64(min_buffer_size); DECLARE_int32(num_remote_hdfs_io_threads); DECLARE_int32(num_s3_io_threads); DECLARE_int32(num_adls_io_threads); const int MIN_BUFFER_SIZE = 128; const int MAX_BUFFER_SIZE = 1024; -const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024; +const int64_t LARGE_RESERVATION_LIMIT = 4L * 1024L * 1024L * 1024L; +const int64_t LARGE_INITIAL_RESERVATION = 128L * 1024L * 1024L; +const int64_t BUFFER_POOL_CAPACITY = LARGE_RESERVATION_LIMIT; namespace impala { namespace io { @@ -53,15 +62,39 @@ class DiskIoMgrTest : public testing::Test { virtual void SetUp() { test_env_.reset(new TestEnv); + // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it. + test_env_->SetBufferPoolArgs(1, BUFFER_POOL_CAPACITY); ASSERT_OK(test_env_->Init()); + RandTestUtil::SeedRng("DISK_IO_MGR_TEST_SEED", &rng_); } virtual void TearDown() { + root_reservation_.Close(); pool_.Clear(); + test_env_.reset(); + } + + /// Initialises 'root_reservation_'. The reservation is automatically closed in + /// TearDown(). + void InitRootReservation(int64_t reservation_limit) { + root_reservation_.InitRootTracker( + RuntimeProfile::Create(&pool_, "root"), reservation_limit); + } + + /// Initialise 'client' with the given reservation limit. The client reservation is a + /// child of 'root_reservation_'. + void RegisterBufferPoolClient(int64_t reservation_limit, int64_t initial_reservation, + BufferPool::ClientHandle* client) { + ASSERT_OK(buffer_pool()->RegisterClient("", nullptr, &root_reservation_, nullptr, + reservation_limit, RuntimeProfile::Create(&pool_, ""), client)); + if (initial_reservation > 0) { + ASSERT_TRUE(client->IncreaseReservation(initial_reservation)); + } } + void WriteValidateCallback(int num_writes, WriteRange** written_range, - DiskIoMgr* io_mgr, RequestContext* reader, int32_t* data, - Status expected_status, const Status& status) { + DiskIoMgr* io_mgr, RequestContext* reader, BufferPool::ClientHandle* client, + int32_t* data, Status expected_status, const Status& status) { if (expected_status.code() == TErrorCode::CANCELLED) { EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail(); } else { @@ -71,8 +104,8 @@ class DiskIoMgrTest : public testing::Test { ScanRange* scan_range = pool_.Add(new ScanRange()); scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(), (*written_range)->offset(), 0, false, BufferOpts::Uncached()); - ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data), - sizeof(int32_t)); + ValidateSyncRead(io_mgr, reader, client, scan_range, + reinterpret_cast<const char*>(data), sizeof(int32_t)); } { @@ -93,9 +126,13 @@ class DiskIoMgrTest : public testing::Test { protected: void CreateTempFile(const char* filename, const char* data) { + CreateTempFile(filename, data, strlen(data)); + } + + void CreateTempFile(const char* filename, const char* data, int64_t data_bytes) { FILE* file = fopen(filename, "w"); EXPECT_TRUE(file != nullptr); - fwrite(data, 1, strlen(data), file); + fwrite(data, 1, data_bytes, file); fclose(file); } @@ -120,13 +157,14 @@ class DiskIoMgrTest : public testing::Test { } static void ValidateSyncRead(DiskIoMgr* io_mgr, RequestContext* reader, - ScanRange* range, const char* expected, int expected_len = -1) { + BufferPool::ClientHandle* client, ScanRange* range, const char* expected, + int expected_len = -1) { unique_ptr<BufferDescriptor> buffer; bool needs_buffers; ASSERT_OK(io_mgr->StartScanRange(reader, range, &needs_buffers)); if (needs_buffers) { ASSERT_OK(io_mgr->AllocateBuffersForRange( - reader, range, io_mgr->max_buffer_size())); + reader, client, range, io_mgr->max_buffer_size())); } ASSERT_OK(range->GetNext(&buffer)); ASSERT_TRUE(buffer != nullptr); @@ -161,8 +199,8 @@ class DiskIoMgrTest : public testing::Test { // Continues pulling scan ranges from the io mgr until they are all done. // Updates num_ranges_processed with the number of ranges seen by this thread. static void ScanRangeThread(DiskIoMgr* io_mgr, RequestContext* reader, - const char* expected_result, int expected_len, const Status& expected_status, - int max_ranges, AtomicInt32* num_ranges_processed) { + BufferPool::ClientHandle* client, const char* expected_result, int expected_len, + const Status& expected_status, int max_ranges, AtomicInt32* num_ranges_processed) { int num_ranges = 0; while (max_ranges == 0 || num_ranges < max_ranges) { ScanRange* range; @@ -172,7 +210,7 @@ class DiskIoMgrTest : public testing::Test { if (range == nullptr) break; if (needs_buffers) { ASSERT_OK(io_mgr->AllocateBuffersForRange( - reader, range, io_mgr->max_buffer_size() * 3)); + reader, client, range, io_mgr->max_buffer_size() * 3)); } ValidateScanRange(io_mgr, range, expected_result, expected_len, expected_status); num_ranges_processed->Add(1); @@ -180,23 +218,27 @@ class DiskIoMgrTest : public testing::Test { } } - ScanRange* AllocateRange() { - return pool_.Add(new ScanRange); - } - - ScanRange* InitRange(const char* file_path, int offset, int len, + ScanRange* InitRange(ObjectPool* pool, const char* file_path, int offset, int len, int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) { - ScanRange* range = AllocateRange(); + ScanRange* range = pool->Add(new ScanRange); range->Reset(nullptr, file_path, len, offset, disk_id, true, BufferOpts(is_cached, mtime), meta_data); EXPECT_EQ(mtime, range->mtime()); return range; } + /// Convenience function to get a reference to the buffer pool. + BufferPool* buffer_pool() const { return ExecEnv::GetInstance()->buffer_pool(); } + boost::scoped_ptr<TestEnv> test_env_; + /// Per-test random number generator. Seeded before every test. + mt19937 rng_; + ObjectPool pool_; + ReservationTracker root_reservation_; + mutex written_mutex_; ConditionVariable writes_done_; int num_ranges_written_; @@ -207,7 +249,7 @@ class DiskIoMgrTest : public testing::Test { // by reading the data back via a separate IoMgr instance. All writes are expected to // complete successfully. TEST_F(DiskIoMgrTest, SingleWriter) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); num_ranges_written_ = 0; string tmp_file = "/tmp/disk_io_mgr_test.txt"; int num_ranges = 100; @@ -221,24 +263,27 @@ TEST_F(DiskIoMgrTest, SingleWriter) { } scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10)); - MemTracker reader_mem_tracker(LARGE_MEM_LIMIT); + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); ASSERT_OK(read_io_mgr->Init()); - unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext(&reader_mem_tracker); + unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext(); for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - pool_.Clear(); // Destroy scan ranges from previous iterations. + // Pool for temporary objects from this iteration only. + ObjectPool tmp_pool; DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10); ASSERT_OK(io_mgr.Init()); - unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker); + unique_ptr<RequestContext> writer = io_mgr.RegisterContext(); for (int i = 0; i < num_ranges; ++i) { - int32_t* data = pool_.Add(new int32_t); + int32_t* data = tmp_pool.Add(new int32_t); *data = rand(); - WriteRange** new_range = pool_.Add(new WriteRange*); - WriteRange::WriteDoneCallback callback = - bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges, - new_range, read_io_mgr.get(), reader.get(), data, Status::OK(), _1); - *new_range = pool_.Add(new WriteRange( - tmp_file, cur_offset, num_ranges % num_disks, callback)); + WriteRange** new_range = tmp_pool.Add(new WriteRange*); + WriteRange::WriteDoneCallback callback = bind( + mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges, new_range, + read_io_mgr.get(), reader.get(), &read_client, data, Status::OK(), _1); + *new_range = tmp_pool.Add( + new WriteRange(tmp_file, cur_offset, num_ranges % num_disks, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range)); cur_offset += sizeof(int32_t); @@ -254,27 +299,26 @@ TEST_F(DiskIoMgrTest, SingleWriter) { } read_io_mgr->UnregisterContext(reader.get()); - read_io_mgr.reset(); + buffer_pool()->DeregisterClient(&read_client); } // Perform invalid writes (e.g. file in non-existent directory, negative offset) and // validate that an error status is returned via the write callback. TEST_F(DiskIoMgrTest, InvalidWrite) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); num_ranges_written_ = 0; string tmp_file = "/non-existent/file.txt"; DiskIoMgr io_mgr(1, 1, 1, 1, 10); ASSERT_OK(io_mgr.Init()); - unique_ptr<RequestContext> writer = io_mgr.RegisterContext(nullptr); + unique_ptr<RequestContext> writer = io_mgr.RegisterContext(); int32_t* data = pool_.Add(new int32_t); *data = rand(); // Write to file in non-existent directory. WriteRange** new_range = pool_.Add(new WriteRange*); WriteRange::WriteDoneCallback callback = - bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range, - (DiskIoMgr*)nullptr, (RequestContext*)nullptr, data, - Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1); + bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range, nullptr, + nullptr, nullptr, data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1); *new_range = pool_.Add(new WriteRange(tmp_file, rand(), 0, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); @@ -289,9 +333,9 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { } new_range = pool_.Add(new WriteRange*); - callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, - new_range, (DiskIoMgr*)nullptr, (RequestContext*)nullptr, - data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1); + callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range, + nullptr, nullptr, nullptr, data, + Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1); *new_range = pool_.Add(new WriteRange(tmp_file, -1, 0, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); @@ -309,7 +353,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { // AddWriteRange() is expected to succeed before the cancel and fail after it. // The writes themselves may finish with status cancelled or ok. TEST_F(DiskIoMgrTest, SingleWriterCancel) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); num_ranges_written_ = 0; string tmp_file = "/tmp/disk_io_mgr_test.txt"; int num_ranges = 100; @@ -324,29 +368,33 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) { } scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10)); - MemTracker reader_mem_tracker(LARGE_MEM_LIMIT); + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); ASSERT_OK(read_io_mgr->Init()); - unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext(&reader_mem_tracker); + unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext(); for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - pool_.Clear(); // Destroy scan ranges from previous iterations. + // Pool for temporary objects from this iteration only. + ObjectPool tmp_pool; DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10); ASSERT_OK(io_mgr.Init()); - unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker); + unique_ptr<RequestContext> writer = io_mgr.RegisterContext(); Status validate_status = Status::OK(); for (int i = 0; i < num_ranges; ++i) { if (i == num_ranges_before_cancel) { writer->Cancel(); validate_status = Status::CANCELLED; } - int32_t* data = pool_.Add(new int32_t); + int32_t* data = tmp_pool.Add(new int32_t); *data = rand(); - WriteRange** new_range = pool_.Add(new WriteRange*); - WriteRange::WriteDoneCallback callback = bind( - mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges_before_cancel, - new_range, read_io_mgr.get(), reader.get(), data, Status::CANCELLED, _1); - *new_range = pool_.Add(new WriteRange( - tmp_file, cur_offset, num_ranges % num_disks, callback)); + WriteRange** new_range = tmp_pool.Add(new WriteRange*); + WriteRange::WriteDoneCallback callback = + bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, + num_ranges_before_cancel, new_range, read_io_mgr.get(), reader.get(), + &read_client, data, Status::CANCELLED, _1); + *new_range = tmp_pool.Add( + new WriteRange(tmp_file, cur_offset, num_ranges % num_disks, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); cur_offset += sizeof(int32_t); Status add_status = io_mgr.AddWriteRange(writer.get(), *new_range); @@ -363,13 +411,13 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) { } read_io_mgr->UnregisterContext(reader.get()); - read_io_mgr.reset(); + buffer_pool()->DeregisterClient(&read_client); } // Basic test with a single reader, testing multiple threads, disks and a different // number of buffers. TEST_F(DiskIoMgrTest, SingleReader) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; const char* data = "abcdefghijklm"; int len = strlen(data); @@ -383,7 +431,7 @@ TEST_F(DiskIoMgrTest, SingleReader) { for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) { - ObjectPool pool; + ObjectPool tmp_pool; LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk << " num_disk=" << num_disks << " num_read_threads=" << num_read_threads; @@ -392,36 +440,39 @@ TEST_F(DiskIoMgrTest, SingleReader) { DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1); ASSERT_OK(io_mgr.Init()); - MemTracker reader_mem_tracker; - unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker); + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); + unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); vector<ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; - ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime)); + ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime)); } ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges)); AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < num_read_threads; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, len, - Status::OK(), 0, &num_ranges_processed)); + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), + &read_client, data, len, Status::OK(), 0, &num_ranges_processed)); } threads.join_all(); EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); io_mgr.UnregisterContext(reader.get()); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); + EXPECT_EQ(read_client.GetUsedReservation(), 0); + buffer_pool()->DeregisterClient(&read_client); } } } - EXPECT_EQ(mem_tracker.consumption(), 0); + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } // This test issues adding additional scan ranges while there are some still in flight. TEST_F(DiskIoMgrTest, AddScanRangeTest) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; const char* data = "abcdefghijklm"; int len = strlen(data); @@ -434,7 +485,8 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) { int64_t iters = 0; for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - pool_.Clear(); // Destroy scan ranges from previous iterations. + // Pool for temporary objects from this iteration only. + ObjectPool tmp_pool; LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk << " num_disk=" << num_disks; @@ -442,8 +494,10 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) { DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1); ASSERT_OK(io_mgr.Init()); - MemTracker reader_mem_tracker; - unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker); + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); + unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); vector<ScanRange*> ranges_first_half; vector<ScanRange*> ranges_second_half; @@ -451,10 +505,10 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) { int disk_id = i % num_disks; if (i > len / 2) { ranges_second_half.push_back( - InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime)); + InitRange(&tmp_pool, tmp_file, i, 1, disk_id, stat_val.st_mtime)); } else { ranges_first_half.push_back( - InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime)); + InitRange(&tmp_pool, tmp_file, i, 1, disk_id, stat_val.st_mtime)); } } AtomicInt32 num_ranges_processed; @@ -463,8 +517,8 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) { ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_first_half)); // Read a couple of them - ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 2, - &num_ranges_processed); + ScanRangeThread(&io_mgr, reader.get(), &read_client, data, strlen(data), + Status::OK(), 2, &num_ranges_processed); // Issue second half ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_second_half)); @@ -472,24 +526,26 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) { // Start up some threads and then cancel thread_group threads; for (int i = 0; i < 3; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, - strlen(data), Status::CANCELLED, 0, &num_ranges_processed)); + threads.add_thread( + new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client, data, + strlen(data), Status::CANCELLED, 0, &num_ranges_processed)); } threads.join_all(); EXPECT_EQ(num_ranges_processed.Load(), len); io_mgr.UnregisterContext(reader.get()); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); + EXPECT_EQ(read_client.GetUsedReservation(), 0); + buffer_pool()->DeregisterClient(&read_client); } } - EXPECT_EQ(mem_tracker.consumption(), 0); + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } // Test to make sure that sync reads and async reads work together // Note: this test is constructed so the number of buffers is greater than the // number of scan ranges. TEST_F(DiskIoMgrTest, SyncReadTest) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; const char* data = "abcdefghijklm"; int len = strlen(data); @@ -502,7 +558,8 @@ TEST_F(DiskIoMgrTest, SyncReadTest) { int64_t iters = 0; for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - pool_.Clear(); // Destroy scan ranges from previous iterations. + // Pool for temporary objects from this iteration only. + ObjectPool tmp_pool; LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk << " num_disk=" << num_disks; @@ -511,50 +568,55 @@ TEST_F(DiskIoMgrTest, SyncReadTest) { MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); ASSERT_OK(io_mgr.Init()); - MemTracker reader_mem_tracker; - unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker); + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); + unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); + ScanRange* complete_range = - InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime); + InitRange(&tmp_pool, tmp_file, 0, strlen(data), 0, stat_val.st_mtime); // Issue some reads before the async ones are issued - ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); - ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); vector<ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; - ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime)); + ranges.push_back( + InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime)); } ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges)); AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < 5; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, - strlen(data), Status::OK(), 0, &num_ranges_processed)); + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), + &read_client, data, strlen(data), Status::OK(), 0, &num_ranges_processed)); } // Issue some more sync ranges for (int i = 0; i < 5; ++i) { sched_yield(); - ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); } threads.join_all(); - ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); - ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); io_mgr.UnregisterContext(reader.get()); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); + EXPECT_EQ(read_client.GetUsedReservation(), 0); + buffer_pool()->DeregisterClient(&read_client); } } - EXPECT_EQ(mem_tracker.consumption(), 0); + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } // Tests a single reader cancelling half way through scan ranges. TEST_F(DiskIoMgrTest, SingleReaderCancel) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; const char* data = "abcdefghijklm"; int len = strlen(data); @@ -567,7 +629,8 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) { int64_t iters = 0; for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - pool_.Clear(); // Destroy scan ranges from previous iterations. + // Pool for temporary objects from this iteration only. + ObjectPool tmp_pool; LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk << " num_disk=" << num_disks; @@ -575,13 +638,16 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) { DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1); ASSERT_OK(io_mgr.Init()); - MemTracker reader_mem_tracker; - unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker); + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); + unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); vector<ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; - ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime)); + ranges.push_back( + InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime)); } ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges)); @@ -589,16 +655,17 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) { int num_succesful_ranges = ranges.size() / 2; // Read half the ranges for (int i = 0; i < num_succesful_ranges; ++i) { - ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 1, - &num_ranges_processed); + ScanRangeThread(&io_mgr, reader.get(), &read_client, data, strlen(data), + Status::OK(), 1, &num_ranges_processed); } EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges); // Start up some threads and then cancel thread_group threads; for (int i = 0; i < 3; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, - strlen(data), Status::CANCELLED, 0, &num_ranges_processed)); + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), + &read_client, data, strlen(data), Status::CANCELLED, 0, + &num_ranges_processed)); } reader->Cancel(); @@ -607,87 +674,93 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) { threads.join_all(); EXPECT_TRUE(reader->IsCancelled()); io_mgr.UnregisterContext(reader.get()); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); + EXPECT_EQ(read_client.GetUsedReservation(), 0); + buffer_pool()->DeregisterClient(&read_client); } } - EXPECT_EQ(mem_tracker.consumption(), 0); + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } -// Test when the reader goes over the mem limit -TEST_F(DiskIoMgrTest, MemLimits) { +// Test readers running with different amounts of memory and getting blocked on scan +// ranges that have run out of buffers. +TEST_F(DiskIoMgrTest, MemScarcity) { const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; - const char* data = "abcdefghijklm"; - int len = strlen(data); - CreateTempFile(tmp_file, data); + // File is 2.5 max buffers so that we can scan file without returning buffers + // when we get the max reservation below. + const int64_t DATA_BYTES = MAX_BUFFER_SIZE * 5 / 2; + char data[DATA_BYTES]; + for (int i = 0; i < DATA_BYTES; ++i) { + data[i] = uniform_int_distribution<uint8_t>(0, 255)(rng_); + } + CreateTempFile(tmp_file, data, DATA_BYTES); // Get mtime for file struct stat stat_val; stat(tmp_file, &stat_val); - const int mem_limit_num_buffers = 2; - // Allocate enough ranges so that the total buffers exceeds the mem limit. + const int RESERVATION_LIMIT_NUM_BUFFERS = 20; + const int64_t RESERVATION_LIMIT = RESERVATION_LIMIT_NUM_BUFFERS * MAX_BUFFER_SIZE; + InitRootReservation(RESERVATION_LIMIT); + + thread_group threads; + // Allocate enough ranges so that the total buffers exceeds the limit. const int num_ranges = 25; { - MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE); DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); ASSERT_OK(io_mgr.Init()); - MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker); - unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker); + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient(RESERVATION_LIMIT, RESERVATION_LIMIT, &read_client); + unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); vector<ScanRange*> ranges; for (int i = 0; i < num_ranges; ++i) { - ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime)); + ranges.push_back(InitRange(&pool_, tmp_file, 0, DATA_BYTES, 0, stat_val.st_mtime)); } ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges)); - - // Don't return buffers to force memory pressure - vector<pair<ScanRange*, unique_ptr<BufferDescriptor>>> buffers; - - AtomicInt32 num_ranges_processed; - ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::MemLimitExceeded(), - 1, &num_ranges_processed); - - bool hit_mem_limit_exceeded = false; - char result[strlen(data) + 1]; - // Keep starting new ranges without returning buffers. This forces us to go over - // the limit eventually. - while (true) { - memset(result, 0, strlen(data) + 1); + // Keep starting new ranges without returning buffers until we run out of + // reservation. + while (read_client.GetUnusedReservation() >= MIN_BUFFER_SIZE) { ScanRange* range = nullptr; bool needs_buffers; - Status status = io_mgr.GetNextUnstartedRange(reader.get(), &range, &needs_buffers); - ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded()); - hit_mem_limit_exceeded |= status.IsMemLimitExceeded(); + ASSERT_OK(io_mgr.GetNextUnstartedRange(reader.get(), &range, &needs_buffers)); if (range == nullptr) break; - DCHECK(needs_buffers); - status = io_mgr.AllocateBuffersForRange(reader.get(), range, MAX_BUFFER_SIZE * 3); - ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded()); - if (status.IsMemLimitExceeded()) { - hit_mem_limit_exceeded = true; - continue; - } - - while (true) { - unique_ptr<BufferDescriptor> buffer; - Status status = range->GetNext(&buffer); - ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded()); - hit_mem_limit_exceeded |= status.IsMemLimitExceeded(); - if (buffer == nullptr) break; - memcpy(result + range->offset() + buffer->scan_range_offset(), - buffer->buffer(), buffer->len()); - buffers.emplace_back(range, move(buffer)); - } - ValidateEmptyOrCorrect(data, result, strlen(data)); - } - - for (int i = 0; i < buffers.size(); ++i) { - buffers[i].first->ReturnBuffer(move(buffers[i].second)); + ASSERT_TRUE(needs_buffers); + // Pick a random amount of memory to reserve. + int64_t max_bytes_to_alloc = uniform_int_distribution<int64_t>(MIN_BUFFER_SIZE, + min<int64_t>(read_client.GetUnusedReservation(), MAX_BUFFER_SIZE * 3))(rng_); + ASSERT_OK(io_mgr.AllocateBuffersForRange( + reader.get(), &read_client, range, max_bytes_to_alloc)); + // Start a thread fetching from the range. The thread will either finish the + // range or be cancelled. + threads.add_thread(new thread([&data, DATA_BYTES, range] { + // Don't return buffers to force memory pressure. + vector<unique_ptr<BufferDescriptor>> buffers; + int64_t data_offset = 0; + Status status; + while (true) { + unique_ptr<BufferDescriptor> buffer; + status = range->GetNext(&buffer); + ASSERT_TRUE(status.ok() || status.IsCancelled()) << status.GetDetail(); + if (status.IsCancelled() || buffer == nullptr) break; + EXPECT_EQ(0, memcmp(data + data_offset, buffer->buffer(), buffer->len())); + data_offset += buffer->len(); + buffers.emplace_back(move(buffer)); + } + if (status.ok()) ASSERT_EQ(DATA_BYTES, data_offset); + for (auto& buffer : buffers) range->ReturnBuffer(move(buffer)); + })); + // Let the thread start running before starting the next. + SleepForMs(10); } - - EXPECT_TRUE(hit_mem_limit_exceeded) << "Should have run out of memory"; + // Let the threads run for a bit then cancel everything. + SleepForMs(500); + reader->Cancel(); + // Wait until the threads have returned their buffers before unregistering. + threads.join_all(); io_mgr.UnregisterContext(reader.get()); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); + EXPECT_EQ(read_client.GetUsedReservation(), 0); + buffer_pool()->DeregisterClient(&read_client); } } @@ -696,7 +769,7 @@ TEST_F(DiskIoMgrTest, MemLimits) { // only tests the fallback mechanism. // TODO: we can fake the cached read path without HDFS TEST_F(DiskIoMgrTest, CachedReads) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; const char* data = "abcdefghijklm"; int len = strlen(data); @@ -711,51 +784,54 @@ TEST_F(DiskIoMgrTest, CachedReads) { DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); ASSERT_OK(io_mgr.Init()); - MemTracker reader_mem_tracker; - unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker); + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); + unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); ScanRange* complete_range = - InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true); + InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true); // Issue some reads before the async ones are issued - ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); - ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); vector<ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; ranges.push_back( - InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true)); + InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true)); } ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges)); AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < 5; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, - strlen(data), Status::OK(), 0, &num_ranges_processed)); + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client, + data, strlen(data), Status::OK(), 0, &num_ranges_processed)); } // Issue some more sync ranges for (int i = 0; i < 5; ++i) { sched_yield(); - ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); } threads.join_all(); - ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); - ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data); EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); io_mgr.UnregisterContext(reader.get()); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); + EXPECT_EQ(read_client.GetUsedReservation(), 0); + buffer_pool()->DeregisterClient(&read_client); } - EXPECT_EQ(mem_tracker.consumption(), 0); + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } TEST_F(DiskIoMgrTest, MultipleReaderWriter) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); const int ITERATIONS = 1; const char* data = "abcdefghijklmnopqrstuvwxyz"; const int num_contexts = 5; @@ -777,17 +853,22 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { int64_t iters = 0; vector<unique_ptr<RequestContext>> contexts(num_contexts); + unique_ptr<BufferPool::ClientHandle[]> clients( + new BufferPool::ClientHandle[num_contexts]); Status status; for (int iteration = 0; iteration < ITERATIONS; ++iteration) { for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { + // Pool for temporary objects from this iteration only. + ObjectPool tmp_pool; DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); ASSERT_OK(io_mgr.Init()); for (int file_index = 0; file_index < num_contexts; ++file_index) { - contexts[file_index] = io_mgr.RegisterContext(&mem_tracker); + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &clients[file_index]); + contexts[file_index] = io_mgr.RegisterContext(); } - pool_.Clear(); int read_offset = 0; int write_offset = 0; while (read_offset < file_size) { @@ -799,11 +880,11 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset); for (int i = 0; i < num_scan_ranges; ++i) { ranges.push_back(InitRange( - file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime)); - threads.add_thread( - new thread(ScanRangeThread, &io_mgr, contexts[context_index].get(), - reinterpret_cast<const char*>(data + (read_offset % strlen(data))), - 1, Status::OK(), num_scan_ranges, &num_ranges_processed)); + &tmp_pool, file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime)); + threads.add_thread(new thread(ScanRangeThread, &io_mgr, + contexts[context_index].get(), &clients[context_index], + reinterpret_cast<const char*>(data + (read_offset % strlen(data))), + 1, Status::OK(), num_scan_ranges, &num_ranges_processed)); ++read_offset; } @@ -813,7 +894,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { WriteRange::WriteDoneCallback callback = bind(mem_fn(&DiskIoMgrTest::WriteCompleteCallback), this, num_write_ranges, _1); - WriteRange* new_range = pool_.Add(new WriteRange( + WriteRange* new_range = tmp_pool.Add(new WriteRange( file_name, write_offset, i % num_disks, callback)); new_range->SetData( reinterpret_cast<const uint8_t*>(data + (write_offset % strlen(data))), @@ -832,6 +913,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { } // while (read_offset < file_size) for (int file_index = 0; file_index < num_contexts; ++file_index) { io_mgr.UnregisterContext(contexts[file_index].get()); + buffer_pool()->DeregisterClient(&clients[file_index]); } } // for (int num_disks } // for (int threads_per_disk @@ -840,23 +922,19 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { // This test will test multiple concurrent reads each reading a different file. TEST_F(DiskIoMgrTest, MultipleReader) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); const int NUM_READERS = 5; const int DATA_LEN = 50; const int ITERATIONS = 25; const int NUM_THREADS_PER_READER = 3; - vector<string> file_names; - vector<int64_t> mtimes; - vector<string> data; - vector<unique_ptr<RequestContext>> readers; - vector<char*> results; - - file_names.resize(NUM_READERS); - readers.resize(NUM_READERS); - mtimes.resize(NUM_READERS); - data.resize(NUM_READERS); - results.resize(NUM_READERS); + vector<string> file_names(NUM_READERS); + vector<int64_t> mtimes(NUM_READERS); + vector<string> data(NUM_READERS); + unique_ptr<BufferPool::ClientHandle[]> clients( + new BufferPool::ClientHandle[NUM_READERS]); + vector<unique_ptr<RequestContext>> readers(NUM_READERS); + vector<char*> results(NUM_READERS); // Initialize data for each reader. The data will be // 'abcd...' for reader one, 'bcde...' for reader two (wrapping around at 'z') @@ -887,7 +965,8 @@ TEST_F(DiskIoMgrTest, MultipleReader) { for (int iteration = 0; iteration < ITERATIONS; ++iteration) { for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - pool_.Clear(); // Destroy scan ranges from previous iterations. + // Pool for temporary objects from this iteration only. + ObjectPool tmp_pool; LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk << " num_disk=" << num_disks; if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters; @@ -897,12 +976,14 @@ TEST_F(DiskIoMgrTest, MultipleReader) { ASSERT_OK(io_mgr.Init()); for (int i = 0; i < NUM_READERS; ++i) { - readers[i] = io_mgr.RegisterContext(&mem_tracker); + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &clients[i]); + readers[i] = io_mgr.RegisterContext(); vector<ScanRange*> ranges; for (int j = 0; j < DATA_LEN; ++j) { int disk_id = j % num_disks; - ranges.push_back(InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i])); + ranges.push_back(InitRange(&tmp_pool, file_names[i].c_str(), j, 1, disk_id, mtimes[i])); } ASSERT_OK(io_mgr.AddScanRanges(readers[i].get(), ranges)); } @@ -912,18 +993,20 @@ TEST_F(DiskIoMgrTest, MultipleReader) { for (int i = 0; i < NUM_READERS; ++i) { for (int j = 0; j < NUM_THREADS_PER_READER; ++j) { threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i].get(), - data[i].c_str(), data[i].size(), Status::OK(), 0, &num_ranges_processed)); + &clients[i], data[i].c_str(), data[i].size(), Status::OK(), 0, + &num_ranges_processed)); } } threads.join_all(); EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS); for (int i = 0; i < NUM_READERS; ++i) { io_mgr.UnregisterContext(readers[i].get()); + buffer_pool()->DeregisterClient(&clients[i]); } } } } - EXPECT_EQ(mem_tracker.consumption(), 0); + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } // Stress test for multiple clients with cancellation @@ -937,7 +1020,7 @@ TEST_F(DiskIoMgrTest, StressTest) { // IMPALA-2366: handle partial read where range goes past end of file. TEST_F(DiskIoMgrTest, PartialRead) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; const char* data = "the quick brown fox jumped over the lazy dog"; int len = strlen(data); @@ -956,19 +1039,22 @@ TEST_F(DiskIoMgrTest, PartialRead) { for (int64_t max_buffer_size : max_buffer_sizes) { DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, max_buffer_size); - ASSERT_OK(io_mgr.Init()); - MemTracker reader_mem_tracker; unique_ptr<RequestContext> reader; - reader = io_mgr.RegisterContext(&reader_mem_tracker); + reader = io_mgr.RegisterContext(); + + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); // We should not read past the end of file. - ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime); + ScanRange* range = InitRange(&pool_, tmp_file, 0, read_len, 0, stat_val.st_mtime); unique_ptr<BufferDescriptor> buffer; bool needs_buffers; ASSERT_OK(io_mgr.StartScanRange(reader.get(), range, &needs_buffers)); if (needs_buffers) { - ASSERT_OK(io_mgr.AllocateBuffersForRange(reader.get(), range, 3 * max_buffer_size)); + ASSERT_OK(io_mgr.AllocateBuffersForRange( + reader.get(), &read_client, range, 3 * max_buffer_size)); } int64_t bytes_read = 0; @@ -994,15 +1080,13 @@ TEST_F(DiskIoMgrTest, PartialRead) { } while (!eosr); io_mgr.UnregisterContext(reader.get()); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); - EXPECT_EQ(mem_tracker.consumption(), 0); - pool_.Clear(); + EXPECT_EQ(read_client.GetUsedReservation(), 0); + buffer_pool()->DeregisterClient(&read_client); } } // Test zero-length scan range. TEST_F(DiskIoMgrTest, ZeroLengthScanRange) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; const char* data = "the quick brown fox jumped over the lazy dog"; const int64_t MIN_BUFFER_SIZE = 2; @@ -1016,12 +1100,9 @@ TEST_F(DiskIoMgrTest, ZeroLengthScanRange) { DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); ASSERT_OK(io_mgr.Init()); - MemTracker reader_mem_tracker; - unique_ptr<RequestContext> reader; - reader = io_mgr.RegisterContext(&reader_mem_tracker); + unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); - // We should not read past the end of file. - ScanRange* range = InitRange(tmp_file, 0, 0, 0, stat_val.st_mtime); + ScanRange* range = InitRange(&pool_, tmp_file, 0, 0, 0, stat_val.st_mtime); bool needs_buffers; Status status = io_mgr.StartScanRange(reader.get(), range, &needs_buffers); ASSERT_EQ(TErrorCode::DISK_IO_ERROR, status.code()); @@ -1035,7 +1116,6 @@ TEST_F(DiskIoMgrTest, ZeroLengthScanRange) { // Test what happens if don't call AllocateBuffersForRange() after trying to start a // range. TEST_F(DiskIoMgrTest, SkipAllocateBuffers) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; const char* data = "the quick brown fox jumped over the lazy dog"; int len = strlen(data); @@ -1051,13 +1131,12 @@ TEST_F(DiskIoMgrTest, SkipAllocateBuffers) { ASSERT_OK(io_mgr.Init()); MemTracker reader_mem_tracker; - unique_ptr<RequestContext> reader; - reader = io_mgr.RegisterContext(&reader_mem_tracker); + unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); // We should not read past the end of file. vector<ScanRange*> ranges; for (int i = 0; i < 4; ++i) { - ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime)); + ranges.push_back(InitRange(&pool_, tmp_file, 0, len, 0, stat_val.st_mtime)); } bool needs_buffers; // Test StartScanRange(). @@ -1080,7 +1159,7 @@ TEST_F(DiskIoMgrTest, SkipAllocateBuffers) { // Test reading into a client-allocated buffer. TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; const char* data = "the quick brown fox jumped over the lazy dog"; int len = strlen(data); @@ -1090,15 +1169,13 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len)); ASSERT_OK(io_mgr->Init()); - // Reader doesn't need to provide mem tracker if it's providing buffers. - MemTracker* reader_mem_tracker = nullptr; - unique_ptr<RequestContext> reader; - reader = io_mgr->RegisterContext(reader_mem_tracker); + // Reader doesn't need to provide client if it's providing buffers. + unique_ptr<RequestContext> reader = io_mgr->RegisterContext(); for (int buffer_len : vector<int>({len - 1, len, len + 1})) { vector<uint8_t> client_buffer(buffer_len); int scan_len = min(len, buffer_len); - ScanRange* range = AllocateRange(); + ScanRange* range = pool_.Add(new ScanRange); range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, BufferOpts::ReadInto(client_buffer.data(), buffer_len)); bool needs_buffers; @@ -1113,32 +1190,31 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { ASSERT_EQ(memcmp(io_buffer->buffer(), data, scan_len), 0); // DiskIoMgr should not have allocated memory. - EXPECT_EQ(mem_tracker.consumption(), 0); + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); range->ReturnBuffer(move(io_buffer)); } io_mgr->UnregisterContext(reader.get()); - pool_.Clear(); - io_mgr.reset(); - EXPECT_EQ(mem_tracker.consumption(), 0); + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } // Test reading into a client-allocated buffer where the read fails. TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) { - MemTracker mem_tracker(LARGE_MEM_LIMIT); + InitRootReservation(LARGE_RESERVATION_LIMIT); const char* tmp_file = "/file/that/does/not/exist"; const int SCAN_LEN = 128; scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, SCAN_LEN, SCAN_LEN)); ASSERT_OK(io_mgr->Init()); - // Reader doesn't need to provide mem tracker if it's providing buffers. - MemTracker* reader_mem_tracker = nullptr; - unique_ptr<RequestContext> reader; vector<uint8_t> client_buffer(SCAN_LEN); for (int i = 0; i < 1000; ++i) { - reader = io_mgr->RegisterContext(reader_mem_tracker); - ScanRange* range = AllocateRange(); + // Reader doesn't need to provide mem tracker if it's providing buffers. + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); + unique_ptr<RequestContext> reader = io_mgr->RegisterContext(); + ScanRange* range = pool_.Add(new ScanRange); range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN)); bool needs_buffers; @@ -1153,25 +1229,25 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) { ASSERT_FALSE(range->GetNext(&io_buffer).ok()); // DiskIoMgr should not have allocated memory. - EXPECT_EQ(mem_tracker.consumption(), 0); + EXPECT_EQ(read_client.GetUsedReservation(), 0); io_mgr->UnregisterContext(reader.get()); + EXPECT_EQ(read_client.GetUsedReservation(), 0); + buffer_pool()->DeregisterClient(&read_client); } - pool_.Clear(); - io_mgr.reset(); - EXPECT_EQ(mem_tracker.consumption(), 0); + EXPECT_EQ(root_reservation_.GetChildReservations(), 0); } // Test to verify configuration parameters for number of I/O threads per disk. TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) { + InitRootReservation(LARGE_RESERVATION_LIMIT); const int num_io_threads_for_remote_disks = FLAGS_num_remote_hdfs_io_threads + FLAGS_num_s3_io_threads + FLAGS_num_adls_io_threads; // Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk. // Since we do not have control over which disk is used, we check for either type // (rotational/solid state) - MemTracker mem_tracker(LARGE_MEM_LIMIT); const int num_io_threads_per_rotational_or_ssd = 2; DiskIoMgr io_mgr(1, num_io_threads_per_rotational_or_ssd, num_io_threads_per_rotational_or_ssd, 1, 10); http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc index 6dda447..6c7b9e6 100644 --- a/be/src/runtime/io/disk-io-mgr.cc +++ b/be/src/runtime/io/disk-io-mgr.cc @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include "common/global-flags.h" #include "runtime/io/disk-io-mgr.h" + +#include "common/global-flags.h" +#include "runtime/exec-env.h" #include "runtime/io/disk-io-mgr-internal.h" #include "runtime/io/handle-cache.inline.h" @@ -52,6 +54,8 @@ DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk"); static const int THREADS_PER_ROTATIONAL_DISK = 1; static const int THREADS_PER_SOLID_STATE_DISK = 8; +const int64_t DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; + // The maximum number of the threads per rotational disk is also the max queue depth per // rotational disk. static const string num_io_threads_per_rotational_disk_help_msg = Substitute("Number of " @@ -309,9 +313,8 @@ Status DiskIoMgr::Init() { return Status::OK(); } -unique_ptr<RequestContext> DiskIoMgr::RegisterContext(MemTracker* mem_tracker) { - return unique_ptr<RequestContext>( - new RequestContext(this, num_total_disks(), mem_tracker)); +unique_ptr<RequestContext> DiskIoMgr::RegisterContext() { + return unique_ptr<RequestContext>(new RequestContext(this, num_total_disks())); } void DiskIoMgr::UnregisterContext(RequestContext* reader) { @@ -455,28 +458,21 @@ Status DiskIoMgr::GetNextUnstartedRange(RequestContext* reader, ScanRange** rang } } -Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader, ScanRange* range, - int64_t max_bytes) { +Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader, + BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes) { DCHECK_GE(max_bytes, min_buffer_size_); DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER) << static_cast<int>(range->external_buffer_tag_) << " invalid to allocate buffers " << "when already reading into an external buffer"; - + BufferPool* bp = ExecEnv::GetInstance()->buffer_pool(); Status status; vector<unique_ptr<BufferDescriptor>> buffers; for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) { - if (!reader->mem_tracker_->TryConsume(buffer_size)) { - status = reader->mem_tracker_->MemLimitExceeded(nullptr, - "Failed to allocate I/O buffer", buffer_size); - goto error; - } - uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(buffer_size)); - if (buffer == nullptr) { - reader->mem_tracker_->Release(buffer_size); - status = Status(Substitute("Failed to malloc $0-byte I/O buffer", buffer_size)); - goto error; - } - buffers.emplace_back(new BufferDescriptor(this, reader, range, buffer, buffer_size)); + BufferPool::BufferHandle handle; + status = bp->AllocateBuffer(bp_client, buffer_size, &handle); + if (!status.ok()) goto error; + buffers.emplace_back(new BufferDescriptor( + this, reader, range, bp_client, move(handle))); } range->AddUnusedBuffers(move(buffers), false); return Status::OK(); http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h index d429d1d..d246e95 100644 --- a/be/src/runtime/io/disk-io-mgr.h +++ b/be/src/runtime/io/disk-io-mgr.h @@ -30,6 +30,7 @@ #include "common/hdfs.h" #include "common/object-pool.h" #include "common/status.h" +#include "runtime/bufferpool/buffer-pool.h" #include "runtime/io/handle-cache.h" #include "runtime/io/request-ranges.h" #include "runtime/thread-resource-mgr.h" @@ -42,8 +43,6 @@ namespace impala { -class MemTracker; - namespace io { /// Manager object that schedules IO for all queries on all disks and remote filesystems /// (such as S3). Each query maps to one or more RequestContext objects, each of which @@ -163,6 +162,7 @@ namespace io { /// buffer and one buffer is in the disk queue. The additional buffer can absorb /// bursts where the producer runs faster than the consumer or the consumer runs /// faster than the producer without blocking either the producer or consumer. +/// See IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE. /// /// Caching support: /// Scan ranges contain metadata on whether or not it is cached on the DN. In that @@ -243,11 +243,7 @@ class DiskIoMgr : public CacheLineAligned { /// Allocates tracking structure for a request context. /// Register a new request context and return it to the caller. The caller must call /// UnregisterContext() for each context. - /// reader_mem_tracker: Is non-null only for readers. IO buffers - /// used for this reader will be tracked by this. If the limit is exceeded - /// the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via - /// GetNext(). - std::unique_ptr<RequestContext> RegisterContext(MemTracker* reader_mem_tracker); + std::unique_ptr<RequestContext> RegisterContext(); /// Unregisters context from the disk IoMgr by first cancelling it then blocking until /// all references to the context are removed from I/O manager internal data structures. @@ -302,16 +298,15 @@ class DiskIoMgr : public CacheLineAligned { /// *needs_buffers=true. /// /// The buffer sizes are chosen based on range->len(). 'max_bytes' must be >= - /// min_read_buffer_size() so that at least one buffer can be allocated. Returns ok - /// if the buffers were successfully allocated and the range was scheduled. Fails with - /// MEM_LIMIT_EXCEEDED if the buffers could not be allocated. On failure, any allocated - /// buffers are freed and the state of 'range' is unmodified so that allocation can be - /// retried. Setting 'max_bytes' to 3 * max_buffer_size() will typically maximize I/O - /// throughput. See Buffer management" section of the class comment for explanation. - /// TODO: error handling contract will change with reservations. The caller needs to - /// to guarantee that there is sufficient reservation. - Status AllocateBuffersForRange(RequestContext* reader, ScanRange* range, - int64_t max_bytes); + /// min_read_buffer_size() so that at least one buffer can be allocated. The caller + /// must ensure that 'bp_client' has at least 'max_bytes' unused reservation. Returns ok + /// if the buffers were successfully allocated and the range was scheduled. + /// + /// Setting 'max_bytes' to IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE * max_buffer_size() + /// will typically maximize I/O throughput. See the "Buffer Management" section of + /// the class comment for explanation. + Status AllocateBuffersForRange(RequestContext* reader, + BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes); /// Determine which disk queue this file should be assigned to. Returns an index into /// disk_queues_. The disk_id is the volume ID for the local disk that holds the @@ -379,6 +374,10 @@ class DiskIoMgr : public CacheLineAligned { REMOTE_NUM_DISKS }; + /// The ideal number of max-sized buffers per scan range to maximise throughput. + /// See "Buffer Management" in the class comment for explanation. + static const int64_t IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE = 3; + private: friend class BufferDescriptor; friend class RequestContext; @@ -401,7 +400,7 @@ class DiskIoMgr : public CacheLineAligned { /// Maximum read size. This is also the maximum size of each allocated buffer. const int64_t max_buffer_size_; - /// The minimum size of each read buffer. + /// The minimum size of each read buffer. Must be >= BufferPool::min_buffer_len(). const int64_t min_buffer_size_; /// Thread group containing all the worker threads. http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/request-context.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc index b124702..dec6aa6 100644 --- a/be/src/runtime/io/request-context.cc +++ b/be/src/runtime/io/request-context.cc @@ -17,6 +17,8 @@ #include "runtime/io/disk-io-mgr-internal.h" +#include "runtime/exec-env.h" + #include "common/names.h" using namespace impala; @@ -36,12 +38,28 @@ BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, DCHECK_GE(buffer_len, 0); } +BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader, + ScanRange* scan_range, BufferPool::ClientHandle* bp_client, + BufferPool::BufferHandle handle) : + io_mgr_(io_mgr), + reader_(reader), + scan_range_(scan_range), + buffer_(handle.data()), + buffer_len_(handle.len()), + bp_client_(bp_client), + handle_(move(handle)) { + DCHECK(io_mgr != nullptr); + DCHECK(scan_range != nullptr); + DCHECK(bp_client_->is_registered()); + DCHECK(handle_.is_open()); +} + void RequestContext::FreeBuffer(BufferDescriptor* buffer) { DCHECK(buffer->buffer_ != nullptr); if (!buffer->is_cached() && !buffer->is_client_buffer()) { - // Only buffers that were not allocated by DiskIoMgr need to have memory freed. - free(buffer->buffer_); - mem_tracker_->Release(buffer->buffer_len_); + // Only buffers that were allocated by DiskIoMgr need to be freed. + ExecEnv::GetInstance()->buffer_pool()->FreeBuffer( + buffer->bp_client_, &buffer->handle_); } buffer->buffer_ = nullptr; } @@ -200,9 +218,8 @@ void RequestContext::RemoveActiveScanRangeLocked( active_scan_ranges_.erase(range); } -RequestContext::RequestContext( - DiskIoMgr* parent, int num_disks, MemTracker* tracker) - : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {} +RequestContext::RequestContext(DiskIoMgr* parent, int num_disks) + : parent_(parent), disk_states_(num_disks) {} // Dumps out request context information. Lock should be taken by caller string RequestContext::DebugString() const { http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/request-context.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h index 3aea2bc..24fd0fc 100644 --- a/be/src/runtime/io/request-context.h +++ b/be/src/runtime/io/request-context.h @@ -158,12 +158,12 @@ class RequestContext { Inactive, }; - RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker); + RequestContext(DiskIoMgr* parent, int num_disks); - /// Cleans up a buffer. If the buffer was allocated with AllocBuffer(), frees the buffer - /// memory and release the consumption to the client MemTracker. Otherwise (e.g. a - /// client or HDFS cache buffer), just prepares the descriptor to be destroyed. - /// After this is called, buffer->buffer() is NULL. Does not acquire 'lock_'. + /// Cleans up a buffer. If the buffer was allocated with AllocateBuffersForRange(), + /// frees the buffer. Otherwise (e.g. a client or HDFS cache buffer), just prepares the + /// descriptor to be destroyed. After this is called, buffer->buffer() is NULL. + /// Does not acquire 'lock_'. void FreeBuffer(BufferDescriptor* buffer); /// Decrements the number of active disks for this reader. If the disk count @@ -239,10 +239,6 @@ class RequestContext { /// Parent object DiskIoMgr* const parent_; - /// Memory used for this reader. This is unowned by this object. - /// TODO: replace with bp client - MemTracker* const mem_tracker_; - /// Total bytes read for this reader RuntimeProfile::Counter* bytes_read_counter_ = nullptr; http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/request-ranges.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h index 041cb9d..0b234ac 100644 --- a/be/src/runtime/io/request-ranges.h +++ b/be/src/runtime/io/request-ranges.h @@ -32,8 +32,6 @@ #include "util/mem-range.h" namespace impala { -class MemTracker; - namespace io { class DiskIoMgr; class RequestContext; @@ -63,11 +61,15 @@ class BufferDescriptor { friend class ScanRange; friend class RequestContext; - /// Create a buffer descriptor for a new reader, range and data buffer. The buffer - /// memory should already be accounted against 'mem_tracker'. + /// Create a buffer descriptor for a new reader, range and data buffer. BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader, ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len); + /// Create a buffer descriptor allocated from the buffer pool. + BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader, + ScanRange* scan_range, BufferPool::ClientHandle* bp_client, + BufferPool::BufferHandle handle); + /// Return true if this is a cached buffer owned by HDFS. bool is_cached() const; @@ -97,6 +99,11 @@ class BufferDescriptor { bool eosr_ = false; int64_t scan_range_offset_ = 0; + + // Handle to an allocated buffer and the client used to allocate it buffer. Only used + // for non-external buffers. + BufferPool::ClientHandle* bp_client_ = nullptr; + BufferPool::BufferHandle handle_; }; /// The request type, read or write associated with a request range. http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc index 0663a2b..9c2110c 100644 --- a/be/src/runtime/io/scan-range.cc +++ b/be/src/runtime/io/scan-range.cc @@ -340,9 +340,6 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) { DCHECK(exclusive_hdfs_fh_ == nullptr); DCHECK(local_file_ == nullptr); - // Reader must provide MemTracker or a buffer. - DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER - || reader->mem_tracker_ != nullptr); io_mgr_ = io_mgr; reader_ = reader; local_file_ = nullptr; @@ -650,8 +647,7 @@ Status ScanRange::ReadFromCache( } // Create a single buffer desc for the entire scan range and enqueue that. - // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client, - // not the Impala backend. + // The memory is owned by the HDFS java client, not the Impala backend. unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor( io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0)); desc->len_ = bytes_read; http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 3a69c33..e0c58d4 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -243,7 +243,7 @@ TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, next_allocation_index_(0), free_ranges_(64) { DCHECK(tmp_file_mgr != nullptr); - io_ctx_ = io_mgr_->RegisterContext(nullptr); + io_ctx_ = io_mgr_->RegisterContext(); } TmpFileMgr::FileGroup::~FileGroup() { http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/common/thrift/PlanNodes.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index c5df1cd..1ab05a0 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -239,6 +239,9 @@ struct THdfsScanNode { // The byte offset of the slot for Parquet metadata if Parquet count star optimization // is enabled. 10: optional i32 parquet_count_star_slot_offset + + // The ideal memory reservation in bytes to process an input split. + 11: optional i64 ideal_scan_range_reservation } struct TDataSourceScanNode { http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java index 4f0a0e1..aae3863 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java +++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java @@ -163,6 +163,25 @@ public class SlotDescriptor { } /** + * Checks if this descriptor describes an array "pos" pseudo-column. + * + * Note: checking whether the column is null distinguishes between top-level columns + * and nested types. This check more specifically looks just for a reference to the + * "pos" field of an array type. + */ + public boolean isArrayPosRef() { + if (parent_ == null) return false; + Type parentType = parent_.getType(); + if (parentType instanceof CollectionStructType) { + if (((CollectionStructType)parentType).isArrayStruct() && + label_.equals(Path.ARRAY_POS_FIELD_NAME)) { + return true; + } + } + return false; + } + + /** * Assembles the absolute materialized path to this slot starting from the schema * root. The materialized path points to the first non-struct schema element along the * path starting from the parent's tuple path to this slot's path. http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/fe/src/main/java/org/apache/impala/analysis/SlotRef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java index 23f2d88..0a945bd 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java @@ -153,26 +153,6 @@ public class SlotRef extends Expr { return "<slot " + Integer.toString(desc_.getId().asInt()) + ">"; } - /** - * Checks if this slotRef refers to an array "pos" pseudo-column. - * - * Note: checking whether the column is null distinguishes between top-level columns - * and nested types. This check more specifically looks just for a reference to the - * "pos" field of an array type. - */ - public boolean isArrayPosRef() { - TupleDescriptor parent = getDesc().getParent(); - if (parent == null) return false; - Type parentType = parent.getType(); - if (parentType instanceof CollectionStructType) { - if (((CollectionStructType)parentType).isArrayStruct() && - getDesc().getLabel().equals(Path.ARRAY_POS_FIELD_NAME)) { - return true; - } - } - return false; - } - @Override protected void toThrift(TExprNode msg) { msg.node_type = TExprNodeType.SLOT_REF;