This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/impala.git
commit 22fb381503c713cbbe431fa059968b5c1dab9ec5 Author: Tim Armstrong <tarmstr...@cloudera.com> AuthorDate: Thu May 31 16:25:26 2018 -0700 IMPALA-6035: Add query options to limit thread reservation Adds two options: THREAD_RESERVATION_LIMIT and THREAD_RESERVATION_AGGREGATE_LIMIT, which are both enforced by admission control based on planner resource requirements and the schedule. The mechanism used is the same as the minimum reservation checks. THREAD_RESERVATION_LIMIT limits the total number of reserved threads in fragments scheduled on a single backend. THREAD_RESERVATION_AGGREGATE_LIMIT limits the sum of reserved threads across all fragments. This also slightly improves the minimum reservation error message to include the host name. Testing: Added end-to-end tests that exercise the code paths. Ran core tests. Change-Id: I5b5bbbdad5cd6b24442eb6c99a4d38c2ad710007 Reviewed-on: http://gerrit.cloudera.org:8080/10365 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Reviewed-on: http://gerrit.cloudera.org:8080/12429 Reviewed-by: Quanlong Huang <huangquanl...@gmail.com> --- be/src/scheduling/admission-controller.cc | 68 ++++++++++---- be/src/scheduling/query-schedule.h | 14 ++- be/src/scheduling/scheduler.cc | 1 + be/src/service/query-options-test.cc | 2 + be/src/service/query-options.cc | 20 ++++ be/src/service/query-options.h | 6 +- common/thrift/ImpalaInternalService.thrift | 6 ++ common/thrift/ImpalaService.thrift | 9 ++ .../admission-reject-min-reservation.test | 5 +- .../queries/QueryTest/runtime_row_filters.test | 8 +- .../queries/QueryTest/thread-limits.test | 104 +++++++++++++++++++++ tests/query_test/test_resource_limits.py | 40 ++++++++ 12 files changed, 254 insertions(+), 29 deletions(-) diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index ce6a82c..f94d454 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -120,8 +120,8 @@ const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION = "plan. See the query profile for more information about the per-node memory " "requirements."; const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION = - "minimum memory reservation is greater than memory available to the query " - "for buffer reservations. Increase the buffer_pool_limit to $0. See the query " + "minimum memory reservation on backend '$0' is greater than memory available to the " + "query for buffer reservations. Increase the buffer_pool_limit to $1. See the query " "profile for more information about the per-node memory requirements."; const string REASON_MIN_RESERVATION_OVER_POOL_MEM = "minimum memory reservation needed is greater than pool max mem resources. Pool " @@ -140,6 +140,12 @@ const string REASON_REQ_OVER_POOL_MEM = const string REASON_REQ_OVER_NODE_MEM = "request memory needed $0 per node is greater than process mem limit $1 of $2.\n\n" "Use the MEM_LIMIT query option to indicate how much memory is required per node."; +const string REASON_THREAD_RESERVATION_LIMIT_EXCEEDED = + "thread reservation on backend '$0' is greater than the THREAD_RESERVATION_LIMIT " + "query option value: $1 > $2."; +const string REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED = + "sum of thread reservations across all $0 backends is greater than the " + "THREAD_RESERVATION_AGGREGATE_LIMIT query option value: $1 > $2."; // Queue decision details // $0 = num running queries, $1 = num queries limit @@ -406,17 +412,24 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule, // the checks isn't particularly important, though some thought was given to ordering // them in a way that might make the sense for a user. - // Compute the max (over all backends) min_mem_reservation_bytes, the cluster total - // (across all backends) min_mem_reservation_bytes and the min (over all backends) + // Compute the max (over all backends) and cluster total (across all backends) for + // min_mem_reservation_bytes and thread_reservation and the min (over all backends) // min_proc_mem_limit. - int64_t max_min_mem_reservation_bytes = -1; + pair<const TNetworkAddress*, int64_t> largest_min_mem_reservation(nullptr, -1); int64_t cluster_min_mem_reservation_bytes = 0; + pair<const TNetworkAddress*, int64_t> max_thread_reservation(nullptr, 0); pair<const TNetworkAddress*, int64_t> min_proc_mem_limit( nullptr, std::numeric_limits<int64_t>::max()); + int64_t cluster_thread_reservation = 0; for (const auto& e : schedule.per_backend_exec_params()) { cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes; - if (e.second.min_mem_reservation_bytes > max_min_mem_reservation_bytes) { - max_min_mem_reservation_bytes = e.second.min_mem_reservation_bytes; + if (e.second.min_mem_reservation_bytes > largest_min_mem_reservation.second) { + largest_min_mem_reservation = + make_pair(&e.first, e.second.min_mem_reservation_bytes); + } + cluster_thread_reservation += e.second.thread_reservation; + if (e.second.thread_reservation > max_thread_reservation.second) { + max_thread_reservation = make_pair(&e.first, e.second.thread_reservation); } if (e.second.proc_mem_limit < min_proc_mem_limit.second) { min_proc_mem_limit.first = &e.first; @@ -425,27 +438,46 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule, } // Checks related to the min buffer reservation against configured query memory limits: - if (schedule.query_options().__isset.buffer_pool_limit - && schedule.query_options().buffer_pool_limit > 0) { - if (max_min_mem_reservation_bytes > schedule.query_options().buffer_pool_limit) { + const TQueryOptions& query_opts = schedule.query_options(); + if (query_opts.__isset.buffer_pool_limit && query_opts.buffer_pool_limit > 0) { + if (largest_min_mem_reservation.second > query_opts.buffer_pool_limit) { *rejection_reason = Substitute(REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION, - PrintBytes(max_min_mem_reservation_bytes)); + TNetworkAddressToString(*largest_min_mem_reservation.first), + PrintBytes(largest_min_mem_reservation.second)); return true; } - } else if (schedule.query_options().__isset.mem_limit - && schedule.query_options().mem_limit > 0) { - const int64_t mem_limit = schedule.query_options().mem_limit; + } else if (query_opts.__isset.mem_limit && query_opts.mem_limit > 0) { + // If buffer_pool_limit is not explicitly set, it's calculated from mem_limit. + const int64_t mem_limit = query_opts.mem_limit; const int64_t max_reservation = ReservationUtil::GetReservationLimitFromMemLimit(mem_limit); - if (max_min_mem_reservation_bytes > max_reservation) { - const int64_t required_mem_limit = - ReservationUtil::GetMinMemLimitFromReservation(max_min_mem_reservation_bytes); + if (largest_min_mem_reservation.second > max_reservation) { + const int64_t required_mem_limit = ReservationUtil::GetMinMemLimitFromReservation( + largest_min_mem_reservation.second); *rejection_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION, - PrintBytes(max_min_mem_reservation_bytes), PrintBytes(required_mem_limit)); + PrintBytes(largest_min_mem_reservation.second), PrintBytes(required_mem_limit)); return true; } } + // Check thread reservation limits. + if (query_opts.__isset.thread_reservation_limit + && query_opts.thread_reservation_limit > 0 + && max_thread_reservation.second > query_opts.thread_reservation_limit) { + *rejection_reason = Substitute(REASON_THREAD_RESERVATION_LIMIT_EXCEEDED, + TNetworkAddressToString(*max_thread_reservation.first), + max_thread_reservation.second, query_opts.thread_reservation_limit); + return true; + } + if (query_opts.__isset.thread_reservation_aggregate_limit + && query_opts.thread_reservation_aggregate_limit > 0 + && cluster_thread_reservation > query_opts.thread_reservation_aggregate_limit) { + *rejection_reason = Substitute(REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED, + schedule.per_backend_exec_params().size(), cluster_thread_reservation, + query_opts.thread_reservation_aggregate_limit); + return true; + } + // Checks related to pool max_requests: if (pool_cfg.max_requests == 0) { *rejection_reason = REASON_DISABLED_REQUESTS_LIMIT; diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h index 870c970..b43cc7b 100644 --- a/be/src/scheduling/query-schedule.h +++ b/be/src/scheduling/query-schedule.h @@ -46,7 +46,8 @@ typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges> FragmentScanRangeAssignment; /// Execution parameters for a single backend. Computed by Scheduler::Schedule(), set -/// via QuerySchedule::set_per_backend_exec_params(). Used as an input to a BackendState. +/// via QuerySchedule::set_per_backend_exec_params(). Used as an input to +/// AdmissionController and a BackendState. struct BackendExecParams { /// The fragment instance params assigned to this backend. All instances of a /// particular fragment are contiguous in this vector. Query lifetime; @@ -58,17 +59,22 @@ struct BackendExecParams { // concurrently-executing operators at any point in query execution. It may be less // than the initial reservation total claims (below) if execution of some operators // never overlaps, which allows reuse of reservations. - int64_t min_mem_reservation_bytes; + int64_t min_mem_reservation_bytes = 0; // Total of the initial buffer reservations that we expect to be claimed on this // backend for all fragment instances in instance_params. I.e. the sum over all // operators in all fragment instances that execute on this backend. This is used for // an optimization in InitialReservation. Measured in bytes. - int64_t initial_mem_reservation_total_claims; + int64_t initial_mem_reservation_total_claims = 0; + + // Total thread reservation for fragment instances scheduled on this backend. This is + // the peak number of required threads that may be required by the + // concurrently-executing fragment instances at any point in query execution. + int64_t thread_reservation = 0; // The process memory limit of this backend. Obtained from the scheduler's executors // configuration which is updated by membership updates from the statestore. - int64_t proc_mem_limit; + int64_t proc_mem_limit = 0; }; /// map from an impalad host address to the list of assigned fragment instance params. diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 0c9d800..5a67a74 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -783,6 +783,7 @@ void Scheduler::ComputeBackendExecParams( be_params.min_mem_reservation_bytes += f.fragment.min_mem_reservation_bytes; be_params.initial_mem_reservation_total_claims += f.fragment.initial_mem_reservation_total_claims; + be_params.thread_reservation += f.fragment.thread_reservation; } } diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index ed8d986..cb73c9a 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -230,6 +230,8 @@ TEST(QueryOptions, SetIntOptions) { {MAKE_OPTIONDEF(batch_size), {0, 65536}}, {MAKE_OPTIONDEF(query_timeout_s), {0, I32_MAX}}, {MAKE_OPTIONDEF(exec_time_limit_s), {0, I32_MAX}}, + {MAKE_OPTIONDEF(thread_reservation_limit), {-1, I32_MAX}}, + {MAKE_OPTIONDEF(thread_reservation_aggregate_limit), {-1, I32_MAX}}, }; for (const auto& test_case : case_set) { const OptionDef<int32_t>& option_def = test_case.first; diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index a0b7227..08b19c7 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -673,6 +673,26 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_max_mem_estimate_for_admission(bytes_limit); break; } + case TImpalaQueryOptions::THREAD_RESERVATION_LIMIT: + case TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT: { + // Parsing logic is identical for these two options. + StringParser::ParseResult status; + int val = StringParser::StringToInt<int>(value.c_str(), value.size(), &status); + if (status != StringParser::PARSE_SUCCESS) { + return Status(Substitute("Invalid thread count: '$0'.", value)); + } + if (val < -1) { + return Status(Substitute("Invalid thread count: '$0'. " + "Only -1 and non-negative values are allowed.", val)); + } + if (option == TImpalaQueryOptions::THREAD_RESERVATION_LIMIT) { + query_options->__set_thread_reservation_limit(val); + } else { + DCHECK_EQ(option, TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT); + query_options->__set_thread_reservation_aggregate_limit(val); + } + break; + } default: // We hit this DCHECK(false) if we forgot to add the corresponding entry here // when we add a new query option. diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 24eaed2..dafa0a0 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::MAX_MEM_ESTIMATE_FOR_ADMISSION + 1);\ + TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT + 1);\ QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED,\ TQueryOptionLevel::DEPRECATED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\ @@ -137,6 +137,10 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> TQueryOptionLevel::ADVANCED)\ QUERY_OPT_FN(max_mem_estimate_for_admission, MAX_MEM_ESTIMATE_FOR_ADMISSION,\ TQueryOptionLevel::ADVANCED)\ + QUERY_OPT_FN(thread_reservation_limit, THREAD_RESERVATION_LIMIT,\ + TQueryOptionLevel::REGULAR)\ + QUERY_OPT_FN(thread_reservation_aggregate_limit, THREAD_RESERVATION_AGGREGATE_LIMIT,\ + TQueryOptionLevel::REGULAR)\ ; /// Enforce practical limits on some query options to avoid undesired query state. diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 464fe87..2c00090 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -311,6 +311,12 @@ struct TQueryOptions { // See comment in ImpalaService.thrift. 65: optional i64 max_mem_estimate_for_admission = 0; + + // See comment in ImpalaService.thrift. + 66: optional i32 thread_reservation_limit = 0; + + // See comment in ImpalaService.thrift. + 67: optional i32 thread_reservation_aggregate_limit = 0; } // Impala currently has two types of sessions: Beeswax and HiveServer2 diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index cdc5bd0..7b6afef 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -322,6 +322,15 @@ enum TImpalaQueryOptions { // workaround if the planner's memory estimate is too high and prevents a runnable // query from being admitted. 0 or -1 means this has no effect. Defaults to 0. MAX_MEM_ESTIMATE_FOR_ADMISSION, + + // Admission control will reject queries when the number of reserved threads per backend + // for the query exceeds this number. 0 or -1 means this has no effect. + THREAD_RESERVATION_LIMIT, + + // Admission control will reject queries when the total number of reserved threads + // across all backends for the query exceeds this number. 0 or -1 means this has no + // effect. + THREAD_RESERVATION_AGGREGATE_LIMIT, } // The summary of a DML statement. diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test index 2ddf999..e658c09 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test +++ b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test @@ -19,8 +19,9 @@ minimum memory reservation needed is greater than pool max mem resources. set buffer_pool_limit=10mb; select distinct * from functional_parquet.alltypesagg ---- CATCH -minimum memory reservation is greater than memory available to the query - for buffer reservations. Increase the buffer_pool_limit to 68.09 MB. +row_regex:.*minimum memory reservation on backend '.*' + is greater than memory available to the query + for buffer reservations\. Increase the buffer_pool_limit to 68.09 MB\. ==== ---- QUERY set mem_limit=1024; diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test index d16e9c0..fe61741 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test +++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test @@ -401,10 +401,10 @@ select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b on a.month = b.id and b.int_col = -3 ---- RESULTS ---- CATCH -Rejected query from pool default-pool: minimum memory reservation is - greater than memory available to the query for buffer reservations. Increase - the buffer_pool_limit to 290.17 MB. See the query profile for more information - about the per-node memory requirements. +row_regex:.*Rejected query from pool default-pool: minimum memory reservation on + backend '.*' is greater than memory available to the query for buffer reservations\. + Increase the buffer_pool_limit to 290.17 MB\. See the query profile for more information + about the per-node memory requirements\. ==== ---- QUERY # Confirm that with broadcast join, memory limit is not hit. diff --git a/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test b/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test new file mode 100644 index 0000000..ad67c70 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test @@ -0,0 +1,104 @@ +==== +---- QUERY +# Test per-backend limit. The coordinator will get 2 fragments + 1 scanner thread +# scheduled on it. +set thread_reservation_limit=2; +select count(*) from alltypes +---- CATCH +row_regex:.*Rejected query from pool default-pool: thread reservation on backend '.*' + is greater than the THREAD_RESERVATION_LIMIT query option value: 3 > 2\. +==== +---- QUERY +# Test the boundary case where the thread reservation is exactly the required threads. +set thread_reservation_limit=3; +select count(*) from alltypes +---- TYPES +BIGINT +---- RESULTS +7300 +==== +---- QUERY +# Zero means no limit. +set thread_reservation_limit=0; +select count(*) from alltypes +---- TYPES +BIGINT +---- RESULTS +7300 +==== +---- QUERY +# -1 means no limit. +set thread_reservation_limit=-1; +select count(*) from alltypes +---- TYPES +BIGINT +---- RESULTS +7300 +==== +---- QUERY +# MT_DOP is factored into reservation. +set thread_reservation_limit=3; +set mt_dop=4; +select count(*) from alltypes +---- CATCH +row_regex:.*Rejected query from pool default-pool: thread reservation on backend '.*' + is greater than the THREAD_RESERVATION_LIMIT query option value: 5 > 3\. +==== +---- QUERY +# Higher aggregate limit can reject based on sum of total threads. Assume >= 2 impalads +# with scan ranges plus the coordinator fragment. +set thread_reservation_aggregate_limit=3; +select count(*) from alltypes +---- CATCH +row_regex:.*Rejected query from pool default-pool: sum of thread reservations across + all [0-9]+ backends is greater than the THREAD_RESERVATION_AGGREGATE_LIMIT query option + value: [0-9]+ > 3\. +==== +---- QUERY +# tpch_parquet.nation has only one file, which means only one instance of the scan fragment, +# which means it only has 3 aggregate threads. +set thread_reservation_aggregate_limit=3; +select count(*) from tpch_parquet.nation +---- TYPES +BIGINT +---- RESULTS +25 +==== +---- QUERY +# tpch_parquet.orders has two files, which means only more instances of the scan fragment, +# which means it has more than 3 aggregate threads, assuming at least two impalads. +set thread_reservation_aggregate_limit=3; +select count(*) from tpch_parquet.orders +---- CATCH +row_regex:.*Rejected query from pool default-pool: sum of thread reservations across + all [0-9]+ backends is greater than the THREAD_RESERVATION_AGGREGATE_LIMIT query option + value: [0-9]+ > 3\. +==== +---- QUERY +# Running on a single impalad gets us under the aggregate limit. +set num_nodes=1; +set thread_reservation_aggregate_limit=3; +select count(*) from alltypes +---- TYPES +BIGINT +---- RESULTS +7300 +==== +---- QUERY +# 0 means no limit. +set thread_reservation_aggregate_limit=0; +select count(*) from alltypes +---- TYPES +BIGINT +---- RESULTS +7300 +==== +---- QUERY +# -1 means no limit. +set thread_reservation_aggregate_limit=-1; +select count(*) from alltypes +---- TYPES +BIGINT +---- RESULTS +7300 +==== diff --git a/tests/query_test/test_resource_limits.py b/tests/query_test/test_resource_limits.py new file mode 100644 index 0000000..c0fc7b5 --- /dev/null +++ b/tests/query_test/test_resource_limits.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.skip import SkipIfLocal +from tests.common.test_dimensions import create_parquet_dimension + + +class TestResourceLimits(ImpalaTestSuite): + """Test resource limit functionality.""" + + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestResourceLimits, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension( + create_parquet_dimension(cls.get_workload())) + + @SkipIfLocal.multiple_impalad + def test_thread_limits(self, vector): + # Remove option from vector to allow test file to override it per query. + del vector.get_value('exec_option')['num_nodes'] + self.run_test_case('QueryTest/thread-limits', vector)