Repository: impala Updated Branches: refs/heads/master 4d6b07f0e -> 2896b8d12
IMPALA-6801: Cleanup request_pool Eliminate the copy of this field in the QuerySchedule. Instead, just set it directly in the TQueryCtx early on. Then, the TQueryCtx doesn't need to be copied by the coordinator. Change-Id: I3bee843ef7d72ba14d487fdb56e55fa3660aafd3 Reviewed-on: http://gerrit.cloudera.org:8080/9909 Reviewed-by: Dan Hecht <dhe...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/58ee5eca Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/58ee5eca Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/58ee5eca Branch: refs/heads/master Commit: 58ee5ecadf2b389fe47dc87085a7d63b4c364e26 Parents: 4d6b07f Author: Dan Hecht <dhe...@cloudera.com> Authored: Tue Apr 3 10:42:39 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Thu Apr 5 03:47:54 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/coordinator.cc | 12 +++++------- be/src/runtime/coordinator.h | 3 --- be/src/scheduling/query-schedule.h | 8 +++----- be/src/scheduling/scheduler.cc | 9 ++++++--- be/src/service/client-request-state.h | 7 +++---- be/src/service/impala-beeswax-server.cc | 2 +- be/src/service/impala-hs2-server.cc | 2 +- be/src/service/impala-http-handler.cc | 2 +- be/src/service/impala-server.cc | 9 +++++++-- be/src/service/impala-server.h | 16 ++++++++-------- common/thrift/ImpalaInternalService.thrift | 4 ++-- 11 files changed, 37 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index e6b3bca..5a3de5d 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -90,11 +90,9 @@ Status Coordinator::Exec() { const TQueryExecRequest& request = schedule_.request(); DCHECK(request.plan_exec_info.size() > 0); - VLOG_QUERY << "Exec() query_id=" << schedule_.query_id() + VLOG_QUERY << "Exec() query_id=" << query_id() << " stmt=" << request.query_ctx.client_request.stmt; stmt_type_ = request.stmt_type; - query_ctx_ = request.query_ctx; - query_ctx_.__set_request_pool(schedule_.request_pool()); query_profile_ = RuntimeProfile::Create(obj_pool(), "Execution Profile " + PrintId(query_id())); @@ -117,7 +115,7 @@ Status Coordinator::Exec() { // TODO: revisit this, it may not be true anymore lock_guard<mutex> l(lock_); - query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx_); + query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx()); query_state_->AcquireExecResourceRefcount(); // Decremented in ReleaseExecResources(). filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker( -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false)); @@ -355,7 +353,7 @@ void Coordinator::StartBackendExec() { for (BackendState* backend_state: backend_states_) { ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer( [backend_state, this, &debug_options]() { - backend_state->Exec(query_ctx_, debug_options, filter_routing_table_, + backend_state->Exec(query_ctx(), debug_options, filter_routing_table_, exec_complete_barrier_.get()); }); } @@ -783,7 +781,7 @@ void Coordinator::ReleaseAdmissionControlResources() { void Coordinator::ReleaseAdmissionControlResourcesLocked() { if (released_admission_control_resources_) return; LOG(INFO) << "Release admission control resources for query_id=" - << PrintId(query_ctx_.query_id); + << PrintId(query_ctx().query_id); AdmissionController* admission_controller = ExecEnv::GetInstance()->admission_controller(); if (admission_controller != nullptr) admission_controller->ReleaseQuery(schedule_); @@ -941,7 +939,7 @@ void Coordinator::FilterState::Disable(MemTracker* tracker) { } const TUniqueId& Coordinator::query_id() const { - return query_ctx_.query_id; + return query_ctx().query_id; } void Coordinator::GetTExecSummary(TExecSummary* exec_summary) { http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 6665c08..8e556ec 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -187,9 +187,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// owned by the ClientRequestState that owns this coordinator const QuerySchedule& schedule_; - /// copied from TQueryExecRequest; constant across all fragments - TQueryCtx query_ctx_; - /// copied from TQueryExecRequest, governs when to call ReportQuerySummary TStmtType::type stmt_type_; http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/be/src/scheduling/query-schedule.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h index 28ccd6f..be219a8 100644 --- a/be/src/scheduling/query-schedule.h +++ b/be/src/scheduling/query-schedule.h @@ -143,8 +143,9 @@ class QuerySchedule { const TUniqueId& query_id() const { return query_id_; } const TQueryExecRequest& request() const { return request_; } const TQueryOptions& query_options() const { return query_options_; } - const std::string& request_pool() const { return request_pool_; } - void set_request_pool(const std::string& pool_name) { request_pool_ = pool_name; } + + // Valid after Schedule() succeeds. + const std::string& request_pool() const { return request().query_ctx.request_pool; } /// Gets the estimated memory (bytes) per-node. Returns the user specified estimate /// (MEM_LIMIT query parameter) if provided or the estimate from planning if available, @@ -250,9 +251,6 @@ class QuerySchedule { /// Used to generate consecutive fragment instance ids. TUniqueId next_instance_id_; - /// Request pool to which the request was submitted for admission. - std::string request_pool_; - /// Indicates if the query has been admitted for execution. bool is_admitted_; http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/be/src/scheduling/scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 5a9d4bf..b091415 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -693,10 +693,13 @@ Status Scheduler::Schedule(QuerySchedule* schedule) { // TODO: Move to admission control, it doesn't need to be in the Scheduler. string resolved_pool; + // Re-resolve the pool name to propagate any resolution errors now that this request + // is known to require a valid pool. RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool( - schedule->request().query_ctx, &resolved_pool)); - schedule->set_request_pool(resolved_pool); - schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool); + schedule->request().query_ctx, &resolved_pool)); + // Resolved pool name should have been set in the TQueryCtx and shouldn't have changed. + DCHECK_EQ(resolved_pool, schedule->request_pool()); + schedule->summary_profile()->AddInfoString("Request Pool", schedule->request_pool()); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/be/src/service/client-request-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index 657f3de..05cd762 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -159,12 +159,11 @@ class ClientRequestState { Coordinator* coord() const { return coord_.get(); } QuerySchedule* schedule() { return schedule_.get(); } - /// Resource pool associated with this query, or an empty string if the schedule has not - /// been created and had the pool set yet, or this StmtType doesn't go through admission - /// control. + /// Admission control resource pool associated with this query. std::string request_pool() const { - return schedule_ == nullptr ? "" : schedule_->request_pool(); + return query_ctx_.__isset.request_pool ? query_ctx_.request_pool : ""; } + int num_rows_fetched() const { return num_rows_fetched_; } void set_fetched_rows() { fetched_rows_ = true; } bool fetched_rows() const { return fetched_rows_; } http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index c441285..4875adb 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -458,7 +458,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query, // Only query options not set in the session or confOverlay can be overridden by the // pool options. - AddPoolQueryOptions(query_ctx, ~set_query_options_mask); + AddPoolConfiguration(query_ctx, ~set_query_options_mask); VLOG_QUERY << "TClientRequest.queryOptions: " << ThriftDebugString(query_ctx->client_request.query_options); return Status::OK(); http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 80ace87..765fccf 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -256,7 +256,7 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext( } // Only query options not set in the session or confOverlay can be overridden by the // pool options. - AddPoolQueryOptions(query_ctx, ~set_query_options_mask); + AddPoolConfiguration(query_ctx, ~set_query_options_mask); VLOG_QUERY << "TClientRequest.queryOptions: " << ThriftDebugString(query_ctx->client_request.query_options); return Status::OK(); http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/be/src/service/impala-http-handler.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index 3841bfe..9b8d597 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -361,7 +361,7 @@ void ImpalaHttpHandler::QueryStateToJson(const ImpalaServer::QueryStateRecord& r Value val_waiting_time(waiting_time_str.c_str(), document->GetAllocator()); value->AddMember("waiting_time", val_waiting_time, document->GetAllocator()); - Value resource_pool(record.request_pool.c_str(), document->GetAllocator()); + Value resource_pool(record.resource_pool.c_str(), document->GetAllocator()); value->AddMember("resource_pool", resource_pool, document->GetAllocator()); } http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 5eeb52a..f6cd6e5 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -782,7 +782,7 @@ void ImpalaServer::ArchiveQuery(const ClientRequestState& query) { ImpalaServer::~ImpalaServer() {} -void ImpalaServer::AddPoolQueryOptions(TQueryCtx* ctx, +void ImpalaServer::AddPoolConfiguration(TQueryCtx* ctx, const QueryOptionsMask& override_options_mask) { // Errors are not returned and are only logged (at level 2) because some incoming // requests are not expected to be mapped to a pool and will not have query options, @@ -796,6 +796,7 @@ void ImpalaServer::AddPoolQueryOptions(TQueryCtx* ctx, << " ResolveRequestPool status: " << status.GetDetail(); return; } + ctx->__set_request_pool(resolved_pool); TPoolConfig config; status = exec_env_->request_pool_service()->GetPoolConfig(resolved_pool, &config); @@ -1717,7 +1718,11 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque } all_rows_returned = request_state.eos(); last_active_time_ms = request_state.last_active_ms(); - request_pool = request_state.request_pool(); + // For statement types other than QUERY/DML, show an empty string for resource pool + // to indicate that they are not subjected to admission control. + if (stmt_type == TStmtType::QUERY || stmt_type == TStmtType::DML) { + resource_pool = request_state.request_pool(); + } user_has_profile_access = request_state.user_has_profile_access(); } http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 2af89fd..fb3f261 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -706,9 +706,9 @@ class ImpalaServer : public ImpalaServiceIf, // The most recent time this query was actively being processed, in Unix milliseconds. int64_t last_active_time_ms; - /// Request pool to which the request was submitted for admission, or an empty string - /// if this request doesn't have a pool. - std::string request_pool; + /// Resource pool to which the request was submitted for admission, or an empty + /// string if this request doesn't go through admission control. + std::string resource_pool; /// Initialise from an exec_state. If copy_profile is true, print the query /// profile to a string and copy that into this.profile (which is expensive), @@ -789,11 +789,11 @@ class ImpalaServer : public ImpalaServiceIf, void CancelFromThreadPool(uint32_t thread_id, const CancellationWork& cancellation_work); - /// Helper method to add any pool query options to the query_ctx. Must be called before - /// ExecuteInternal() at which point the TQueryCtx is const and cannot be mutated. - /// override_options_mask indicates which query options can be overridden by the pool - /// default query options. - void AddPoolQueryOptions(TQueryCtx* query_ctx, + /// Helper method to add the pool name and query options to the query_ctx. Must be + /// called before ExecuteInternal() at which point the TQueryCtx is const and cannot + /// be mutated. override_options_mask indicates which query options can be overridden + /// by the pool default query options. + void AddPoolConfiguration(TQueryCtx* query_ctx, const QueryOptionsMask& override_options_mask); /// Register timeout value upon opening a new session. This will wake up http://git-wip-us.apache.org/repos/asf/impala/blob/58ee5eca/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index dc37fc2..e96ed87 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -389,8 +389,8 @@ struct TQueryCtx { // List of tables with scan ranges that map to blocks with missing disk IDs. 15: optional list<CatalogObjects.TTableName> tables_missing_diskids - // The pool to which this request has been submitted. Used to update pool statistics - // for admission control. + // The resolved admission control pool to which this request will be submitted. May be + // unset for statements that aren't subjected to admission control (e.g. USE, SET). 16: optional string request_pool // String containing a timestamp (in UTC) set as the query submission time. It