http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-schedule.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h index 5ee60d6..c8ebd5d 100644 --- a/be/src/scheduling/query-schedule.h +++ b/be/src/scheduling/query-schedule.h @@ -26,12 +26,10 @@ #include "common/global-types.h" #include "common/status.h" -#include "scheduling/query-resource-mgr.h" #include "util/promise.h" #include "util/runtime-profile.h" #include "gen-cpp/Types_types.h" // for TNetworkAddress #include "gen-cpp/Frontend_types.h" -#include "gen-cpp/ResourceBrokerService_types.h" namespace impala { @@ -74,30 +72,19 @@ class QuerySchedule { const TQueryOptions& query_options, RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events); - /// Returns OK if reservation_ contains a matching resource for each - /// of the hosts in fragment_exec_params_. Returns an error otherwise. - Status ValidateReservation(); - const TUniqueId& query_id() const { return query_id_; } const TQueryExecRequest& request() const { return request_; } const TQueryOptions& query_options() const { return query_options_; } const std::string& request_pool() const { return request_pool_; } void set_request_pool(const std::string& pool_name) { request_pool_ = pool_name; } - bool HasReservation() const { return !reservation_.allocated_resources.empty(); } - - /// Granted or timed out reservations need to be released. In both such cases, - /// the reservation_'s reservation_id is set. - bool NeedsRelease() const { return reservation_.__isset.reservation_id; } - /// Gets the estimated memory (bytes) and vcores per-node. Returns the user specified - /// estimate (MEM_LIMIT query parameter) if provided or the estimate from planning if - /// available, but is capped at the amount of physical memory to avoid problems if - /// either estimate is unreasonably large. + /// Gets the estimated memory (bytes) per-node. Returns the user specified estimate + /// (MEM_LIMIT query parameter) if provided or the estimate from planning if available, + /// but is capped at the amount of physical memory to avoid problems if either estimate + /// is unreasonably large. int64_t GetPerHostMemoryEstimate() const; - int16_t GetPerHostVCores() const; /// Total estimated memory for all nodes. set_num_hosts() must be set before calling. int64_t GetClusterMemoryEstimate() const; - void GetResourceHostport(const TNetworkAddress& src, TNetworkAddress* dst); /// Helper methods used by scheduler to populate this QuerySchedule. void AddScanRanges(int64_t delta) { num_scan_ranges_ += delta; } @@ -116,10 +103,6 @@ class QuerySchedule { const boost::unordered_set<TNetworkAddress>& unique_hosts() const { return unique_hosts_; } - TResourceBrokerReservationResponse* reservation() { return &reservation_; } - const TResourceBrokerReservationRequest& reservation_request() const { - return reservation_request_; - } bool is_admitted() const { return is_admitted_; } void set_is_admitted(bool is_admitted) { is_admitted_ = is_admitted; } RuntimeProfile* summary_profile() { return summary_profile_; } @@ -127,10 +110,6 @@ class QuerySchedule { void SetUniqueHosts(const boost::unordered_set<TNetworkAddress>& unique_hosts); - /// Populates reservation_request_ ready to submit a query to Llama for all initial - /// resources required for this query. - void PrepareReservationRequest(const std::string& pool, const std::string& user); - private: /// These references are valid for the lifetime of this query schedule because they @@ -165,18 +144,9 @@ class QuerySchedule { /// Request pool to which the request was submitted for admission. std::string request_pool_; - /// Reservation request to be submitted to Llama. Set in PrepareReservationRequest(). - TResourceBrokerReservationRequest reservation_request_; - - /// Fulfilled reservation request. Populated by scheduler. - TResourceBrokerReservationResponse reservation_; - /// Indicates if the query has been admitted for execution. bool is_admitted_; - /// Resolves unique_hosts_ to node mgr addresses. Valid only after SetUniqueHosts() has - /// been called. - boost::scoped_ptr<ResourceResolver> resource_resolver_; }; }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/request-pool-service.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/request-pool-service.cc b/be/src/scheduling/request-pool-service.cc index 9f3363d..ea2553e 100644 --- a/be/src/scheduling/request-pool-service.cc +++ b/be/src/scheduling/request-pool-service.cc @@ -44,6 +44,7 @@ static const string DEFAULT_USER = "default"; DEFINE_string(fair_scheduler_allocation_path, "", "Path to the fair scheduler " "allocation file (fair-scheduler.xml)."); +// TODO: Rename / cleanup now that Llama is removed (see IMPALA-4159). DEFINE_string(llama_site_path, "", "Path to the Llama configuration file " "(llama-site.xml). If set, fair_scheduler_allocation_path must also be set."); @@ -74,7 +75,6 @@ DEFINE_bool(disable_pool_mem_limits, false, "Disables all per-pool mem limits.") DEFINE_bool(disable_pool_max_requests, false, "Disables all per-pool limits on the " "maximum number of running requests."); -DECLARE_bool(enable_rm); // Pool name used when the configuration files are not specified. static const string DEFAULT_POOL_NAME = "default-pool"; @@ -94,12 +94,7 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) : resolve_pool_ms_metric_ = StatsMetric<double>::CreateAndRegister(metrics, RESOLVE_POOL_METRIC_NAME); - if (FLAGS_fair_scheduler_allocation_path.empty() && - FLAGS_llama_site_path.empty()) { - if (FLAGS_enable_rm) { - CLEAN_EXIT_WITH_ERROR("If resource management is enabled, " - "-fair_scheduler_allocation_path is required."); - } + if (FLAGS_fair_scheduler_allocation_path.empty()) { default_pool_only_ = true; bool is_percent; // not used int64_t bytes_limit = ParseUtil::ParseMemSpec(FLAGS_default_pool_mem_limit, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index 8b074d2..4c3a967 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -31,7 +31,6 @@ #include "gen-cpp/PlanNodes_types.h" #include "gen-cpp/Frontend_types.h" #include "gen-cpp/ImpalaInternalService_types.h" -#include "gen-cpp/ResourceBrokerService_types.h" namespace impala { @@ -57,21 +56,6 @@ class Scheduler { /// Releases the reserved resources (if any) from the given schedule. virtual Status Release(QuerySchedule* schedule) = 0; - /// Notifies this scheduler that a resource reservation has been preempted by the - /// central scheduler (Yarn via Llama). All affected queries are cancelled - /// via their coordinator. - virtual void HandlePreemptedReservation(const TUniqueId& reservation_id) = 0; - - /// Notifies this scheduler that a single resource with the given client resource id - /// has been preempted by the central scheduler (Yarn via Llama). All affected queries - /// are cancelled via their coordinator. - virtual void HandlePreemptedResource(const TUniqueId& client_resource_id) = 0; - - /// Notifies this scheduler that a single resource with the given client resource id - /// has been lost by the central scheduler (Yarn via Llama). All affected queries - /// are cancelled via their coordinator. - virtual void HandleLostResource(const TUniqueId& client_resource_id) = 0; - /// Initialises the scheduler, acquiring all resources needed to make /// scheduling decisions once this method returns. virtual Status Init() = 0; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/simple-scheduler-test.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler-test.cc b/be/src/scheduling/simple-scheduler-test.cc index 76da6f9..7116e21 100644 --- a/be/src/scheduling/simple-scheduler-test.cc +++ b/be/src/scheduling/simple-scheduler-test.cc @@ -830,8 +830,8 @@ class SchedulerWrapper { scheduler_backend_address.hostname = scheduler_host.ip; scheduler_backend_address.port = scheduler_host.be_port; - scheduler_.reset(new SimpleScheduler(NULL, scheduler_backend_id, - scheduler_backend_address, &metrics_, NULL, NULL, NULL)); + scheduler_.reset(new SimpleScheduler( + NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, NULL)); scheduler_->Init(); // Initialize the scheduler backend maps. SendFullMembershipMap(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/simple-scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc index f4b90cc..f3ba9a5 100644 --- a/be/src/scheduling/simple-scheduler.cc +++ b/be/src/scheduling/simple-scheduler.cc @@ -29,7 +29,6 @@ #include "common/logging.h" #include "util/metrics.h" -#include "resourcebroker/resource-broker.h" #include "runtime/exec-env.h" #include "runtime/coordinator.h" #include "service/impala-server.h" @@ -43,11 +42,9 @@ #include "util/container-util.h" #include "util/debug-util.h" #include "util/error-util.h" -#include "util/llama-util.h" #include "util/mem-info.h" #include "util/parse-util.h" #include "util/runtime-profile-counters.h" -#include "gen-cpp/ResourceBrokerService_types.h" #include "common/names.h" @@ -58,9 +55,6 @@ using namespace strings; DECLARE_int32(be_port); DECLARE_string(hostname); -DECLARE_bool(enable_rm); -DECLARE_int32(rm_default_cpu_vcores); -DECLARE_string(rm_default_memory); DEFINE_bool(disable_admission_control, false, "Disables admission control."); @@ -79,8 +73,7 @@ const string SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership"); SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber, const string& backend_id, const TNetworkAddress& backend_address, - MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker, - RequestPoolService* request_pool_service) + MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service) : backend_config_(std::make_shared<const BackendConfig>()), metrics_(metrics->GetOrCreateChildGroup("scheduler")), webserver_(webserver), @@ -90,7 +83,6 @@ SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber, total_assignments_(NULL), total_local_assignments_(NULL), initialized_(NULL), - resource_broker_(resource_broker), request_pool_service_(request_pool_service) { local_backend_descriptor_.address = backend_address; @@ -99,32 +91,10 @@ SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber, admission_controller_.reset( new AdmissionController(request_pool_service_, metrics, backend_address)); } - - if (FLAGS_enable_rm) { - if (FLAGS_rm_default_cpu_vcores <= 0) { - LOG(ERROR) << "Bad value for --rm_default_cpu_vcores (must be postive): " - << FLAGS_rm_default_cpu_vcores; - exit(1); - } - bool is_percent; - int64_t mem_bytes = - ParseUtil::ParseMemSpec( - FLAGS_rm_default_memory, &is_percent, MemInfo::physical_mem()); - if (mem_bytes <= 1024 * 1024) { - LOG(ERROR) << "Bad value for --rm_default_memory (must be larger than 1M):" - << FLAGS_rm_default_memory; - exit(1); - } else if (is_percent) { - LOG(ERROR) << "Must use absolute value for --rm_default_memory: " - << FLAGS_rm_default_memory; - exit(1); - } - } } SimpleScheduler::SimpleScheduler(const vector<TNetworkAddress>& backends, - MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker, - RequestPoolService* request_pool_service) + MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service) : backend_config_(std::make_shared<const BackendConfig>(backends)), metrics_(metrics), webserver_(webserver), @@ -133,7 +103,6 @@ SimpleScheduler::SimpleScheduler(const vector<TNetworkAddress>& backends, total_assignments_(NULL), total_local_assignments_(NULL), initialized_(NULL), - resource_broker_(resource_broker), request_pool_service_(request_pool_service) { DCHECK(backends.size() > 0); local_backend_descriptor_.address = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port); @@ -289,10 +258,7 @@ void SimpleScheduler::UpdateMembership( // If this impalad is not in our view of the membership list, we should add it and // tell the statestore. - bool is_offline = ExecEnv::GetInstance() && - ExecEnv::GetInstance()->impala_server()->IsOffline(); - if (!is_offline && - current_membership_.find(local_backend_id_) == current_membership_.end()) { + if (current_membership_.find(local_backend_id_) == current_membership_.end()) { VLOG(1) << "Registering local backend with statestore"; subscriber_topic_updates->push_back(TTopicDelta()); TTopicDelta& update = subscriber_topic_updates->back(); @@ -308,13 +274,6 @@ void SimpleScheduler::UpdateMembership( << " " << status.GetDetail(); subscriber_topic_updates->pop_back(); } - } else if (is_offline && - current_membership_.find(local_backend_id_) != current_membership_.end()) { - LOG(WARNING) << "Removing offline ImpalaServer from statestore"; - subscriber_topic_updates->push_back(TTopicDelta()); - TTopicDelta& update = subscriber_topic_updates->back(); - update.topic_name = IMPALA_MEMBERSHIP_TOPIC; - update.topic_deletions.push_back(local_backend_id_); } if (metrics_ != NULL) { num_fragment_instances_metric_->set_value(current_membership_.size()); @@ -626,7 +585,6 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request for (const FragmentExecParams& exec_params: *fragment_exec_params) { unique_hosts.insert(exec_params.hosts.begin(), exec_params.hosts.end()); } - schedule->SetUniqueHosts(unique_hosts); } @@ -723,38 +681,12 @@ Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) { schedule->set_request_pool(resolved_pool); schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool); - if (ExecEnv::GetInstance()->impala_server()->IsOffline()) { - return Status("This Impala server is offline. Please retry your query later."); - } - RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule)); ComputeFragmentHosts(schedule->request(), schedule); ComputeFragmentExecParams(schedule->request(), schedule); if (!FLAGS_disable_admission_control) { RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule)); } - if (!FLAGS_enable_rm) return Status::OK(); - string user = GetEffectiveUser(schedule->request().query_ctx.session); - if (user.empty()) user = "default"; - schedule->PrepareReservationRequest(resolved_pool, user); - const TResourceBrokerReservationRequest& reservation_request = - schedule->reservation_request(); - if (!reservation_request.resources.empty()) { - Status status = resource_broker_->Reserve( - reservation_request, schedule->reservation()); - if (!status.ok()) { - // Warn about missing table and/or column stats if necessary. - const TQueryCtx& query_ctx = schedule->request().query_ctx; - if (!query_ctx.__isset.parent_query_id && - query_ctx.__isset.tables_missing_stats && - !query_ctx.tables_missing_stats.empty()) { - status.AddDetail(GetTablesMissingStatsWarning(query_ctx.tables_missing_stats)); - } - return status; - } - RETURN_IF_ERROR(schedule->ValidateReservation()); - AddToActiveResourceMaps(*schedule->reservation(), coord); - } return Status::OK(); } @@ -762,106 +694,9 @@ Status SimpleScheduler::Release(QuerySchedule* schedule) { if (!FLAGS_disable_admission_control) { RETURN_IF_ERROR(admission_controller_->ReleaseQuery(schedule)); } - if (FLAGS_enable_rm && schedule->NeedsRelease()) { - DCHECK(resource_broker_ != NULL); - Status status = resource_broker_->ReleaseReservation( - schedule->reservation()->reservation_id); - // Remove the reservation from the active-resource maps even if there was an error - // releasing the reservation because the query running in the reservation is done. - RemoveFromActiveResourceMaps(*schedule->reservation()); - RETURN_IF_ERROR(status); - } return Status::OK(); } -void SimpleScheduler::AddToActiveResourceMaps( - const TResourceBrokerReservationResponse& reservation, Coordinator* coord) { - lock_guard<mutex> l(active_resources_lock_); - active_reservations_[reservation.reservation_id] = coord; - map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter; - for (iter = reservation.allocated_resources.begin(); - iter != reservation.allocated_resources.end(); - ++iter) { - TUniqueId client_resource_id; - client_resource_id << iter->second.client_resource_id; - active_client_resources_[client_resource_id] = coord; - } -} - -void SimpleScheduler::RemoveFromActiveResourceMaps( - const TResourceBrokerReservationResponse& reservation) { - lock_guard<mutex> l(active_resources_lock_); - active_reservations_.erase(reservation.reservation_id); - map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter; - for (iter = reservation.allocated_resources.begin(); - iter != reservation.allocated_resources.end(); - ++iter) { - TUniqueId client_resource_id; - client_resource_id << iter->second.client_resource_id; - active_client_resources_.erase(client_resource_id); - } -} - -// TODO: Refactor the Handle*{Reservation,Resource} functions to avoid code duplication. -void SimpleScheduler::HandlePreemptedReservation(const TUniqueId& reservation_id) { - VLOG_QUERY << "HandlePreemptedReservation client_id=" << reservation_id; - Coordinator* coord = NULL; - { - lock_guard<mutex> l(active_resources_lock_); - ActiveReservationsMap::iterator it = active_reservations_.find(reservation_id); - if (it != active_reservations_.end()) coord = it->second; - } - if (coord == NULL) { - LOG(WARNING) << "Ignoring preempted reservation id " << reservation_id - << " because no active query using it was found."; - } else { - stringstream err_msg; - err_msg << "Reservation " << reservation_id << " was preempted"; - Status status(err_msg.str()); - coord->Cancel(&status); - } -} - -void SimpleScheduler::HandlePreemptedResource(const TUniqueId& client_resource_id) { - VLOG_QUERY << "HandlePreemptedResource client_id=" << client_resource_id; - Coordinator* coord = NULL; - { - lock_guard<mutex> l(active_resources_lock_); - ActiveClientResourcesMap::iterator it = - active_client_resources_.find(client_resource_id); - if (it != active_client_resources_.end()) coord = it->second; - } - if (coord == NULL) { - LOG(WARNING) << "Ignoring preempted client resource id " << client_resource_id - << " because no active query using it was found."; - } else { - stringstream err_msg; - err_msg << "Resource " << client_resource_id << " was preempted"; - Status status(err_msg.str()); - coord->Cancel(&status); - } -} - -void SimpleScheduler::HandleLostResource(const TUniqueId& client_resource_id) { - VLOG_QUERY << "HandleLostResource preempting client_id=" << client_resource_id; - Coordinator* coord = NULL; - { - lock_guard<mutex> l(active_resources_lock_); - ActiveClientResourcesMap::iterator it = - active_client_resources_.find(client_resource_id); - if (it != active_client_resources_.end()) coord = it->second; - } - if (coord == NULL) { - LOG(WARNING) << "Ignoring lost client resource id " << client_resource_id - << " because no active query using it was found."; - } else { - stringstream err_msg; - err_msg << "Resource " << client_resource_id << " was lost"; - Status status(err_msg.str()); - coord->Cancel(&status); - } -} - SimpleScheduler::AssignmentCtx::AssignmentCtx( const BackendConfig& backend_config, IntCounter* total_assignments, IntCounter* total_local_assignments) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/simple-scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h index dd119c2..8c96dde 100644 --- a/be/src/scheduling/simple-scheduler.h +++ b/be/src/scheduling/simple-scheduler.h @@ -36,12 +36,10 @@ #include "scheduling/admission-controller.h" #include "scheduling/backend-config.h" #include "gen-cpp/Types_types.h" // for TNetworkAddress -#include "gen-cpp/ResourceBrokerService_types.h" #include "rapidjson/rapidjson.h" namespace impala { -class ResourceBroker; class Coordinator; class SchedulerWrapper; @@ -78,22 +76,18 @@ class SimpleScheduler : public Scheduler { /// - backend_address - the address that this backend listens on SimpleScheduler(StatestoreSubscriber* subscriber, const std::string& backend_id, const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* webserver, - ResourceBroker* resource_broker, RequestPoolService* request_pool_service); + RequestPoolService* request_pool_service); /// Initialize with a list of <host:port> pairs in 'static' mode - i.e. the set of /// backends is fixed and will not be updated. SimpleScheduler(const std::vector<TNetworkAddress>& backends, MetricGroup* metrics, - Webserver* webserver, ResourceBroker* resource_broker, - RequestPoolService* request_pool_service); + Webserver* webserver, RequestPoolService* request_pool_service); /// Register with the subscription manager if required virtual impala::Status Init(); virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule); virtual Status Release(QuerySchedule* schedule); - virtual void HandlePreemptedReservation(const TUniqueId& reservation_id); - virtual void HandlePreemptedResource(const TUniqueId& client_resource_id); - virtual void HandleLostResource(const TUniqueId& client_resource_id); private: /// Map from a host's IP address to the next backend to be round-robin scheduled for @@ -306,27 +300,6 @@ class SimpleScheduler : public Scheduler { /// Current number of backends IntGauge* num_fragment_instances_metric_; - /// Protect active_reservations_ and active_client_resources_. - boost::mutex active_resources_lock_; - - /// Map from a Llama reservation id to the coordinator of the query using that - /// reservation. The map is used to cancel queries whose reservation has been preempted. - /// Entries are added in Schedule() calls that result in granted resource allocations. - /// Entries are removed in Release(). - typedef boost::unordered_map<TUniqueId, Coordinator*> ActiveReservationsMap; - ActiveReservationsMap active_reservations_; - - /// Map from client resource id to the coordinator of the query using that resource. - /// The map is used to cancel queries whose resource(s) have been preempted. - /// Entries are added in Schedule() calls that result in granted resource allocations. - /// Entries are removed in Release(). - typedef boost::unordered_map<TUniqueId, Coordinator*> ActiveClientResourcesMap; - ActiveClientResourcesMap active_client_resources_; - - /// Resource broker that mediates resource requests between Impala and the Llama. - /// Set to NULL if resource management is disabled. - ResourceBroker* resource_broker_; - /// Used for user-to-pool resolution and looking up pool configurations. Not owned by /// us. RequestPoolService* request_pool_service_; @@ -339,16 +312,6 @@ class SimpleScheduler : public Scheduler { BackendConfigPtr GetBackendConfig() const; void SetBackendConfig(const BackendConfigPtr& backend_config); - /// Add the granted reservation and resources to the active_reservations_ and - /// active_client_resources_ maps, respectively. - void AddToActiveResourceMaps( - const TResourceBrokerReservationResponse& reservation, Coordinator* coord); - - /// Remove the given reservation and resources from the active_reservations_ and - /// active_client_resources_ maps, respectively. - void RemoveFromActiveResourceMaps( - const TResourceBrokerReservationResponse& reservation); - /// Called asynchronously when an update is received from the subscription manager void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas, std::vector<TTopicDelta>* subscriber_topic_updates); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index c749a6a..1b10aec 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -60,7 +60,6 @@ #include "service/query-exec-state.h" #include "scheduling/simple-scheduler.h" #include "util/bit-util.h" -#include "util/cgroups-mgr.h" #include "util/container-util.h" #include "util/debug-util.h" #include "util/error-util.h" @@ -177,9 +176,9 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i "QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents" " the maximum allowable timeout."); -DEFINE_string(local_nodemanager_url, "", "The URL of the local Yarn Node Manager's HTTP " - "interface, used to detect if the Node Manager fails"); -DECLARE_bool(enable_rm); +// TODO: Remove for Impala 3.0. +DEFINE_string(local_nodemanager_url, "", "Deprecated"); + DECLARE_bool(compact_catalog_topic); namespace impala { @@ -361,12 +360,6 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) query_expiration_thread_.reset(new Thread("impala-server", "query-expirer", bind<void>(&ImpalaServer::ExpireQueries, this))); - is_offline_ = false; - if (FLAGS_enable_rm) { - nm_failure_detection_thread_.reset(new Thread("impala-server", "nm-failure-detector", - bind<void>(&ImpalaServer::DetectNmFailures, this))); - } - exec_env_->SetImpalaServer(this); } @@ -783,9 +776,7 @@ Status ImpalaServer::ExecuteInternal( shared_ptr<QueryExecState>* exec_state) { DCHECK(session_state != NULL); *registered_exec_state = false; - if (IsOffline()) { - return Status("This Impala server is offline. Please retry your query later."); - } + exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(), this, session_state)); @@ -1938,81 +1929,6 @@ shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState( } } -void ImpalaServer::SetOffline(bool is_offline) { - lock_guard<mutex> l(is_offline_lock_); - is_offline_ = is_offline; - ImpaladMetrics::IMPALA_SERVER_READY->set_value(is_offline); -} - -void ImpalaServer::DetectNmFailures() { - DCHECK(FLAGS_enable_rm); - if (FLAGS_local_nodemanager_url.empty()) { - LOG(WARNING) << "No NM address set (--nm_addr is empty), no NM failure detection " - << "thread started"; - return; - } - // We only want a network address to open a socket to, for now. Get rid of http(s):// - // prefix, and split the string into hostname:port. - if (istarts_with(FLAGS_local_nodemanager_url, "http://")) { - FLAGS_local_nodemanager_url = - FLAGS_local_nodemanager_url.substr(string("http://").size()); - } else if (istarts_with(FLAGS_local_nodemanager_url, "https://")) { - FLAGS_local_nodemanager_url = - FLAGS_local_nodemanager_url.substr(string("https://").size()); - } - vector<string> components; - split(components, FLAGS_local_nodemanager_url, is_any_of(":")); - if (components.size() < 2) { - LOG(ERROR) << "Could not parse network address from --local_nodemanager_url, no NM" - << " failure detection thread started"; - return; - } - DCHECK_GE(components.size(), 2); - TNetworkAddress nm_addr = - MakeNetworkAddress(components[0], atoi(components[1].c_str())); - - MissedHeartbeatFailureDetector failure_detector(MAX_NM_MISSED_HEARTBEATS, - MAX_NM_MISSED_HEARTBEATS / 2); - struct addrinfo* addr; - if (getaddrinfo(nm_addr.hostname.c_str(), components[1].c_str(), NULL, &addr)) { - LOG(WARNING) << "Could not resolve NM address: " << nm_addr << ". Error was: " - << GetStrErrMsg(); - return; - } - LOG(INFO) << "Starting NM failure-detection thread, NM at: " << nm_addr; - // True if the last time through the loop Impala had failed, otherwise false. Used to - // only change the offline status when there's a change in state. - bool last_failure_state = false; - while (true) { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd >= 0) { - if (connect(sockfd, addr->ai_addr, sizeof(sockaddr)) < 0) { - failure_detector.UpdateHeartbeat(FLAGS_local_nodemanager_url, false); - } else { - failure_detector.UpdateHeartbeat(FLAGS_local_nodemanager_url, true); - } - ::close(sockfd); - } else { - LOG(ERROR) << "Could not create socket! Error was: " << GetStrErrMsg(); - } - bool is_failed = (failure_detector.GetPeerState(FLAGS_local_nodemanager_url) == - FailureDetector::FAILED); - if (is_failed != last_failure_state) { - if (is_failed) { - LOG(WARNING) << - "ImpalaServer is going offline while local node-manager connectivity is bad"; - } else { - LOG(WARNING) << - "Node-manager connectivity has been restored. ImpalaServer is now online"; - } - SetOffline(is_failed); - } - last_failure_state = is_failed; - SleepForMs(2000); - } - freeaddrinfo(addr); -} - void ImpalaServer::UpdateFilter(TUpdateFilterResult& result, const TUpdateFilterParams& params) { shared_ptr<QueryExecState> query_exec_state = GetQueryExecState(params.query_id, false); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index f756932..2104c5e 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -240,12 +240,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas, std::vector<TTopicDelta>* topic_updates); - /// Returns true if Impala is offline (and not accepting queries), false otherwise. - bool IsOffline() { - boost::lock_guard<boost::mutex> l(is_offline_lock_); - return is_offline_; - } - /// Returns true if lineage logging is enabled, false otherwise. bool IsLineageLoggingEnabled(); @@ -633,15 +627,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// FLAGS_idle_query_timeout seconds. void ExpireQueries(); - /// Periodically opens a socket to FLAGS_local_nodemanager_url to check if the Yarn Node - /// Manager is running. If not, this method calls SetOffline(true), and when the NM - /// recovers, calls SetOffline(false). Only called (in nm_failure_detection_thread_) if - /// FLAGS_enable_rm is true. - void DetectNmFailures(); - - /// Set is_offline_ to the argument's value. - void SetOffline(bool offline); - /// Guards query_log_ and query_log_index_ boost::mutex query_log_lock_; @@ -963,15 +948,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set. boost::scoped_ptr<Thread> query_expiration_thread_; - - /// Container thread for DetectNmFailures(). - boost::scoped_ptr<Thread> nm_failure_detection_thread_; - - /// Protects is_offline_ - boost::mutex is_offline_lock_; - - /// True if Impala server is offline, false otherwise. - bool is_offline_; }; /// Create an ImpalaServer and Thrift servers. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/impalad-main.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc index de1a56b..55d9ad6 100644 --- a/be/src/service/impalad-main.cc +++ b/be/src/service/impalad-main.cc @@ -54,6 +54,7 @@ DECLARE_int32(beeswax_port); DECLARE_int32(hs2_port); DECLARE_int32(be_port); DECLARE_string(principal); +DECLARE_bool(enable_rm); int ImpaladMain(int argc, char** argv) { InitCommonRuntime(argc, argv, true); @@ -66,6 +67,13 @@ int ImpaladMain(int argc, char** argv) { ABORT_IF_ERROR(HiveUdfCall::Init()); InitFeSupport(); + if (FLAGS_enable_rm) { + // TODO: Remove in Impala 3.0. + LOG(WARNING) << "*****************************************************************"; + LOG(WARNING) << "Llama support has been deprecated. FLAGS_enable_rm has no effect."; + LOG(WARNING) << "*****************************************************************"; + } + // start backend service for the coordinator on be_port ExecEnv exec_env; StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index cea24bb..eb710fb 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -21,7 +21,6 @@ #include "exprs/expr.h" #include "exprs/expr-context.h" -#include "resourcebroker/resource-broker.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" @@ -48,7 +47,6 @@ using namespace strings; DECLARE_int32(catalog_service_port); DECLARE_string(catalog_service_host); -DECLARE_bool(enable_rm); DECLARE_int64(max_result_cache_size); namespace impala { @@ -428,34 +426,16 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED; DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl); - if (FLAGS_enable_rm) { - DCHECK(exec_env_->resource_broker() != NULL); - } schedule_.reset(new QuerySchedule(query_id(), query_exec_request, exec_request_.query_options, &summary_profile_, query_events_)); coord_.reset(new Coordinator(exec_request_.query_options, exec_env_, query_events_)); Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get()); - if (FLAGS_enable_rm) { - if (status.ok()) { - stringstream reservation_request_ss; - reservation_request_ss << schedule_->reservation_request(); - summary_profile_.AddInfoString("Resource reservation request", - reservation_request_ss.str()); - } - } { lock_guard<mutex> l(lock_); RETURN_IF_ERROR(UpdateQueryStatus(status)); } - if (FLAGS_enable_rm && schedule_->HasReservation()) { - // Add the granted reservation to the query profile. - stringstream reservation_ss; - reservation_ss << *schedule_->reservation(); - summary_profile_.AddInfoString("Granted resource reservation", reservation_ss.str()); - query_events_->MarkEvent("Resources reserved"); - } status = coord_->Exec(*schedule_, &output_expr_ctxs_); { lock_guard<mutex> l(lock_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index da682f4..62fff80 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -36,7 +36,6 @@ add_library(Util bitmap.cc bit-util.cc bloom-filter.cc - cgroups-mgr.cc coding-util.cc codec.cc compress.cc @@ -54,7 +53,6 @@ add_library(Util hdr-histogram.cc impalad-metrics.cc jni-util.cc - llama-util.cc logging-support.cc mem-info.cc memory-metrics.cc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/cgroups-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/util/cgroups-mgr.cc b/be/src/util/cgroups-mgr.cc deleted file mode 100644 index e49d57c..0000000 --- a/be/src/util/cgroups-mgr.cc +++ /dev/null @@ -1,238 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "util/cgroups-mgr.h" - -#include <fstream> -#include <sstream> -#include <boost/filesystem.hpp> -#include "util/debug-util.h" -#include <gutil/strings/substitute.h> - -#include "common/names.h" - -using boost::filesystem::create_directory; -using boost::filesystem::exists; -using boost::filesystem::remove; -using namespace impala; -using namespace strings; - -namespace impala { - -// Suffix appended to Yarn resource ids to form an Impala-internal cgroups. -const std::string IMPALA_CGROUP_SUFFIX = "_impala"; - -// Yarn's default multiplier for translating virtual CPU cores into cgroup CPU shares. -// See Yarn's CgroupsLCEResourcesHandler.java for more details. -const int32_t CPU_DEFAULT_WEIGHT = 1024; - -CgroupsMgr::CgroupsMgr(MetricGroup* metrics) { - active_cgroups_metric_ = metrics->AddGauge<int64_t>("cgroups-mgr.active-cgroups", 0); -} - -Status CgroupsMgr::Init(const string& cgroups_hierarchy_path, - const string& staging_cgroup) { - cgroups_hierarchy_path_ = cgroups_hierarchy_path; - staging_cgroup_ = staging_cgroup; - // Set up the staging cgroup for Impala to retire execution threads into. - RETURN_IF_ERROR(CreateCgroup(staging_cgroup, true)); - return Status::OK(); -} - -string CgroupsMgr::UniqueIdToCgroup(const string& unique_id) const { - if (unique_id.empty()) return ""; - return unique_id + IMPALA_CGROUP_SUFFIX; -} - -int32_t CgroupsMgr::VirtualCoresToCpuShares(int16_t v_cpu_cores) { - if (v_cpu_cores <= 0) return -1; - return CPU_DEFAULT_WEIGHT * v_cpu_cores; -} - -Status CgroupsMgr::CreateCgroup(const string& cgroup, bool if_not_exists) const { - const string& cgroup_path = Substitute("$0/$1", cgroups_hierarchy_path_, cgroup); - try { - // Returns false if the dir already exists, otherwise throws an exception. - if (!create_directory(cgroup_path) && !if_not_exists) { - stringstream err_msg; - err_msg << "Failed to create CGroup at path " << cgroup_path - << ". Path already exists."; - return Status(err_msg.str()); - } - LOG(INFO) << "Created CGroup " << cgroup_path; - } catch (std::exception& e) { - stringstream err_msg; - err_msg << "Failed to create CGroup at path " << cgroup_path << ". " << e.what(); - return Status(err_msg.str()); - } - return Status::OK(); -} - -Status CgroupsMgr::DropCgroup(const string& cgroup, bool if_exists) const { - const string& cgroup_path = Substitute("$0/$1", cgroups_hierarchy_path_, cgroup); - LOG(INFO) << "Dropping CGroup " << cgroups_hierarchy_path_ << " " << cgroup; - try { - if(!remove(cgroup_path) && !if_exists) { - stringstream err_msg; - err_msg << "Failed to create CGroup at path " << cgroup_path - << ". Path does not exist."; - return Status(err_msg.str()); - } - } catch (std::exception& e) { - stringstream err_msg; - err_msg << "Failed to drop CGroup at path " << cgroup_path << ". " << e.what(); - return Status(err_msg.str()); - } - return Status::OK(); -} - -Status CgroupsMgr::SetCpuShares(const string& cgroup, int32_t num_shares) { - string cgroup_path; - string tasks_path; - RETURN_IF_ERROR(GetCgroupPaths(cgroup, &cgroup_path, &tasks_path)); - - const string& cpu_shares_path = Substitute("$0/$1", cgroup_path, "cpu.shares"); - ofstream cpu_shares(tasks_path.c_str(), ios::out | ios::trunc); - if (!cpu_shares.is_open()) { - stringstream err_msg; - err_msg << "CGroup CPU shares file: " << cpu_shares_path - << " is not writable by Impala"; - return Status(err_msg.str()); - } - - LOG(INFO) << "Setting CPU shares of CGroup " << cgroup_path << " to " << num_shares; - cpu_shares << num_shares << endl; - return Status::OK(); -} - -Status CgroupsMgr::GetCgroupPaths(const std::string& cgroup, - std::string* cgroup_path, std::string* tasks_path) const { - stringstream cgroup_path_ss; - cgroup_path_ss << cgroups_hierarchy_path_ << "/" << cgroup; - *cgroup_path = cgroup_path_ss.str(); - if (!exists(*cgroup_path)) { - stringstream err_msg; - err_msg << "CGroup " << *cgroup_path << " does not exist"; - return Status(err_msg.str()); - } - - stringstream tasks_path_ss; - tasks_path_ss << *cgroup_path << "/tasks"; - *tasks_path = tasks_path_ss.str(); - if (!exists(*tasks_path)) { - stringstream err_msg; - err_msg << "CGroup " << *cgroup_path << " does not have a /tasks file"; - return Status(err_msg.str()); - } - return Status::OK(); -} - -Status CgroupsMgr::AssignThreadToCgroup(const Thread& thread, - const string& cgroup) const { - string cgroup_path; - string tasks_path; - RETURN_IF_ERROR(GetCgroupPaths(cgroup, &cgroup_path, &tasks_path)); - - ofstream tasks(tasks_path.c_str(), ios::out | ios::app); - if (!tasks.is_open()) { - stringstream err_msg; - err_msg << "CGroup tasks file: " << tasks_path << " is not writable by Impala"; - return Status(err_msg.str()); - } - tasks << thread.tid() << endl; - - VLOG_ROW << "Thread " << thread.tid() << " moved to CGroup " << cgroup_path; - tasks.close(); - return Status::OK(); -} - -Status CgroupsMgr::RelocateThreads(const string& src_cgroup, - const string& dst_cgroup) const { - string src_cgroup_path; - string src_tasks_path; - RETURN_IF_ERROR(GetCgroupPaths(src_cgroup, &src_cgroup_path, &src_tasks_path)); - - string dst_cgroup_path; - string dst_tasks_path; - RETURN_IF_ERROR(GetCgroupPaths(dst_cgroup, &dst_cgroup_path, &dst_tasks_path)); - - ifstream src_tasks(src_tasks_path.c_str()); - if (!src_tasks) { - stringstream err_msg; - err_msg << "Failed to open source CGroup tasks file at: " << src_tasks_path; - return Status(err_msg.str()); - } - - ofstream dst_tasks(dst_tasks_path.c_str(), ios::out | ios::app); - if (!dst_tasks) { - stringstream err_msg; - err_msg << "Failed to open destination CGroup tasks file at: " << dst_tasks_path; - return Status(err_msg.str()); - } - - int32_t tid; - while (src_tasks >> tid) { - dst_tasks << tid << endl; - // Attempting to write a non-existent tid/pid will result in an error, - // so clear the error flags after every append. - dst_tasks.clear(); - VLOG_ROW << "Relocating thread id " << tid << " from " << src_tasks_path - << " to " << dst_tasks_path; - } - - return Status::OK(); -} - -Status CgroupsMgr::RegisterFragment(const TUniqueId& fragment_instance_id, - const string& cgroup, bool* is_first) { - if (cgroup.empty() || cgroups_hierarchy_path_.empty()) return Status::OK(); - - LOG(INFO) << "Registering fragment " << PrintId(fragment_instance_id) - << " with CGroup " << cgroups_hierarchy_path_ << "/" << cgroup; - lock_guard<mutex> l(active_cgroups_lock_); - if (++active_cgroups_[cgroup] == 1) { - *is_first = true; - RETURN_IF_ERROR(CreateCgroup(cgroup, false)); - active_cgroups_metric_->Increment(1); - } else { - *is_first = false; - } - return Status::OK(); -} - -Status CgroupsMgr::UnregisterFragment(const TUniqueId& fragment_instance_id, - const string& cgroup) { - if (cgroup.empty() || cgroups_hierarchy_path_.empty()) return Status::OK(); - - LOG(INFO) << "Unregistering fragment " << PrintId(fragment_instance_id) - << " from CGroup " << cgroups_hierarchy_path_ << "/" << cgroup; - lock_guard<mutex> l(active_cgroups_lock_); - unordered_map<string, int32_t>::iterator entry = active_cgroups_.find(cgroup); - DCHECK(entry != active_cgroups_.end()); - - int32_t* ref_count = &entry->second; - --(*ref_count); - if (*ref_count == 0) { - RETURN_IF_ERROR(RelocateThreads(cgroup, staging_cgroup_)); - RETURN_IF_ERROR(DropCgroup(cgroup, false)); - active_cgroups_metric_->Increment(-1); - active_cgroups_.erase(entry); - } - return Status::OK(); -} - -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/cgroups-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/util/cgroups-mgr.h b/be/src/util/cgroups-mgr.h deleted file mode 100644 index 2e52f6b..0000000 --- a/be/src/util/cgroups-mgr.h +++ /dev/null @@ -1,175 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_UTIL_CGROUPS_MGR_H -#define IMPALA_UTIL_CGROUPS_MGR_H - -#include <string> -#include <boost/thread/mutex.hpp> -#include <boost/unordered_map.hpp> -#include "common/status.h" -#include "util/metrics.h" -#include "util/thread.h" - -namespace impala { - -/// Control Groups, or 'cgroups', are a Linux-specific mechanism for arbitrating resources -/// amongst threads. -// -/// CGroups are organised in a forest of 'hierarchies', each of which are mounted at a path -/// in the filesystem. Each hierarchy contains one or more cgroups, arranged -/// hierarchically. Each hierarchy has one or more 'subsystems' attached. Each subsystem -/// represents a resource to manage, so for example there is a CPU subsystem and a MEMORY -/// subsystem. There are rules about when subsystems may be attached to more than one -/// hierarchy, which are out of scope of this description. -// -/// Each thread running on a kernel with cgroups enabled belongs to exactly one cgroup in -/// every hierarchy at once. Impala is only concerned with a single hierarchy that assigns -/// CPU resources in the first instance. Threads are assigned to cgroups by writing their -/// thread ID to a file in the special cgroup filesystem. -// -/// For more information: -/// access.redhat.com/site/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Resource_Management_Guide/ -/// www.kernel.org/doc/Documentation/cgroups/cgroups.txt - -/// Manages the lifecycle of Impala-internal cgroups as well as the assignment of -/// execution threads into cgroups. -/// To execute queries Impala requests resources from Yarn via the Llama. Yarn returns -/// granted resources via the Llama in the form or "RM resource ids" that conventionally -/// correspond to a CGroups that the Yarn NM creates. Instead of directly using the -/// NM-provided CGroups, Impala creates and manages its own CGroups for the -/// following reasons: -/// 1. In typical CM/Yarn setups, Impala would not have permissions to write to the tasks -/// file of NM-provided CGroups. It is arguably not even desirable (e.g., for security -/// reasons) for external process to be able to manipulate the permissions of -/// NM-generated CGroups either directly or indirectly. -/// 2. Yarn-granted CGroups are created asynchronously (the AM calls to create the -/// CGroups are non-blocking). From Impala's perspective that means that once Impala -/// receives notice from the Llama that resources have been granted, it cannot -/// assume that the corresponding containers have been created (although the Yarn -/// NMs eventually will). While each of Impala's plan fragments could wait for the -/// CGroups to be created, it seems unnecessarily complicated and slow to do so. -/// 3. Impala will probably want to manage its own CGroups eventually, e.g., for -/// optimistic query scheduling. -// -/// In summary, the typical CGroups-related flow of an Impala query is as follows: -/// 1. Impala receives granted resources from Llama and sends out plan fragments -/// 2. On each node execution such a fragment, convert the Yarn resource id into -/// a CGroup that Impala should create and assign the query's threads to -/// 3. Register the fragment(s) and the CGroup for the query with the -/// node-local CGroup manager. The registration creates the CGroup maintains a -/// count of all fragments using that CGroup. -/// 4. Execute the fragments, assigning threads into the Impala-managed CGroup. -/// 5. Complete the fragments by unregistering them with the CGroup from the node-local -/// CGroups manager. When the last fragment for a CGroup is unregistered, all threads -/// from that CGroup are relocated into a special staging CGroup, so that the now -/// unused CGroup can safely be deleted (otherwise, we'd have to wait for the OS to -/// drain all entries from the CGroup's tasks file) -class CgroupsMgr { - public: - CgroupsMgr(MetricGroup* metrics); - - /// Sets the cgroups mgr's corresponding members and creates the staging cgroup - /// under <cgroups_hierarchy_path>/<staging_cgroup>. Returns a non-OK status if - /// creation of the staging cgroup failed, e.g., because of insufficient privileges. - Status Init(const std::string& cgroups_hierarchy_path, - const std::string& staging_cgroup); - - /// Returns the cgroup Impala should create and use for enforcing granted resources - /// identified by the given unique ID (which usually corresponds to a query ID). Returns - /// an empty string if unique_id is empty. - std::string UniqueIdToCgroup(const std::string& unique_id) const; - - /// Returns the cgroup CPU shares corresponding to the given number of virtual cores. - /// Returns -1 if v_cpu_cores is <= 0 (which is invalid). - int32_t VirtualCoresToCpuShares(int16_t v_cpu_cores); - - /// Informs the cgroups mgr that a plan fragment intends to use the given cgroup. - /// If this is the first fragment requesting use of cgroup, then the cgroup will - /// be created and *is_first will be set to true (otherwise to false). In any case the - /// reference count active_cgroups_[cgroup] is incremented. Returns a non-OK status - /// if there was an error creating the cgroup. - Status RegisterFragment(const TUniqueId& fragment_instance_id, - const std::string& cgroup, bool* is_first); - - /// Informs the cgroups mgr that a plan fragment using the given cgroup is complete. - /// Decrements the corresponding reference count active_cgroups_[cgroup]. If the - /// reference count reaches zero this function relocates all thread ids from - /// the cgroup to the staging_cgroup_ and drops cgroup (a cgroup with active thread ids - /// cannot be dropped, so we relocate the thread ids first). - /// Returns a non-OK status there was an error creating the cgroup. - Status UnregisterFragment(const TUniqueId& fragment_instance_id, - const std::string& cgroup); - - /// Creates a cgroup at <cgroups_hierarchy_path_>/<cgroup>. Returns a non-OK status - /// if the cgroup creation failed, e.g., because of insufficient privileges. - /// If is_not_exists is true then no error is returned if the cgroup already exists. - Status CreateCgroup(const std::string& cgroup, bool if_not_exists) const; - - /// Drops the cgroup at <cgroups_hierarchy_path_>/<cgroup>. Returns a non-OK status - /// if the cgroup deletion failed, e.g., because of insufficient privileges. - /// If if_exists is true then no error is returned if the cgroup does not exist. - Status DropCgroup(const std::string& cgroup, bool if_exists) const; - - /// Sets the number of CPU shares for the given cgroup by writing num_shares into the - /// cgroup's cpu.shares file. Returns a non-OK status if there was an error writing - /// to the file, e.g., because of insufficient privileges. - Status SetCpuShares(const std::string& cgroup, int32_t num_shares); - - /// Assigns a given thread to a cgroup, by writing its thread id to - /// <cgroups_hierarchy_path_>/<cgroup>/tasks. If there is no file at that - /// location, returns an error. Otherwise no attempt is made to check that the - /// target belongs to a cgroup hierarchy due to the cost of reading and parsing - /// cgroup information from the filesystem. - Status AssignThreadToCgroup(const Thread& thread, const std::string& cgroup) const; - - /// Reads the <cgroups_hierarchy_path_>/<src_cgroup>/tasks file and writing all the - /// contained thread ids to <cgroups_hierarchy_path_>/<dst_cgroup>/tasks. - /// Assumes that the destination cgroup has already been created. Returns a non-OK - /// status if there was an error reading src_cgroup and/or writing dst_cgroup. - Status RelocateThreads(const std::string& src_cgroup, - const std::string& dst_cgroup) const; - - private: - /// Checks that the cgroups_hierarchy_path_ and the given cgroup under it exists. - /// Returns an error if either of them do not exist. - /// Returns the absolute cgroup path and the absolute path to its tasks file. - Status GetCgroupPaths(const std::string& cgroup, - std::string* cgroup_path, std::string* tasks_path) const; - - /// Number of currently active Impala-managed cgroups. - IntGauge* active_cgroups_metric_; - - /// Root of the CPU cgroup hierarchy. Created cgroups are placed directly under it. - std::string cgroups_hierarchy_path_; - - /// Cgroup that threads from completed queries are relocated into such that the - /// query's cgroup can be dropped. - std::string staging_cgroup_; - - /// Protects active_cgroups_. - boost::mutex active_cgroups_lock_; - - /// Process-wide map from cgroup to number of fragments using the cgroup. - /// A cgroup can be safely dropped once the number of fragments in the cgroup, - /// according to this map, reaches zero. - boost::unordered_map<std::string, int32_t> active_cgroups_; -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/debug-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h index 4b17ed7..a02a288 100644 --- a/be/src/util/debug-util.h +++ b/be/src/util/debug-util.h @@ -32,7 +32,6 @@ #include "gen-cpp/RuntimeProfile_types.h" #include "gen-cpp/ImpalaService_types.h" #include "gen-cpp/parquet_types.h" -#include "gen-cpp/Llama_types.h" #include "runtime/descriptors.h" // for SchemaPath http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/llama-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/llama-util.cc b/be/src/util/llama-util.cc deleted file mode 100644 index 82f2bd6..0000000 --- a/be/src/util/llama-util.cc +++ /dev/null @@ -1,152 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "util/llama-util.h" - -#include <sstream> -#include <boost/algorithm/string/join.hpp> -#include <boost/algorithm/string.hpp> - -#include "common/names.h" -#include "util/debug-util.h" -#include "util/uid-util.h" - -using boost::algorithm::is_any_of; -using boost::algorithm::join; -using boost::algorithm::split; -using namespace llama; - -namespace llama { - -string PrintId(const TUniqueId& id, const string& separator) { - return PrintId(impala::CastTUniqueId<TUniqueId, impala::TUniqueId>(id), separator); -} - -ostream& operator<<(ostream& os, const TUniqueId& id) { - os << hex << id.hi << ":" << id.lo; - return os; -} - -ostream& operator<<(ostream& os, const TNetworkAddress& address) { - os << address.hostname << ":" << dec << address.port; - return os; -} - -ostream& operator<<(ostream& os, const TResource& resource) { - os << "Resource(" - << "client_resource_id=" << resource.client_resource_id << " " - << "v_cpu_cores=" << dec << resource.v_cpu_cores << " " - << "memory_mb=" << dec << resource.memory_mb << " " - << "asked_location=" << resource.askedLocation << " " - << "enforcement=" << resource.enforcement << ")"; - return os; -} - -ostream& operator<<(ostream& os, const TAllocatedResource& resource) { - os << "Allocated Resource(" - << "reservation_id=" << resource.reservation_id << " " - << "client_resource_id=" << resource.client_resource_id << " " - << "rm_resource_id=" << resource.rm_resource_id << " " - << "v_cpu_cores=" << dec << resource.v_cpu_cores << " " - << "memory_mb=" << dec << resource.memory_mb << " " - << "location=" << resource.location << ")"; - return os; -} - -ostream& operator<<(ostream& os, const llama::TLlamaAMGetNodesRequest& request) { - os << "GetNodes Request(llama handle=" << request.am_handle << ")"; - return os; -} - -ostream& operator<<(ostream& os, const llama::TLlamaAMReservationRequest& request) { - os << "Reservation Request(" - << "llama handle=" << request.am_handle << " " - << "queue=" << request.queue << " " - << "user=" << request.user << " " - << "gang=" << request.gang << " " - << "resources=["; - for (int i = 0; i < request.resources.size(); ++i) { - os << request.resources[i]; - if (i + 1 != request.resources.size()) os << ","; - } - os << "])"; - return os; -} - -ostream& operator<<(ostream& os, - const llama::TLlamaAMReservationExpansionRequest& request) { - os << "Expansion Request(" - << "llama handle=" << request.am_handle << " " - << "reservation id=" << request.expansion_of << " " - << "resource=" << request.resource << ")"; - return os; -} - -ostream& operator<<(ostream& os, const llama::TLlamaAMReleaseRequest& request) { - os << "Release Request(" - << "llama handle=" << request.am_handle << " " - << "reservation id=" << request.reservation_id << ")"; - return os; -} - -llama::TUniqueId& operator<<(llama::TUniqueId& dest, const impala::TUniqueId& src) { - dest.lo = src.lo; - dest.hi = src.hi; - return dest; -} - -impala::TUniqueId& operator<<(impala::TUniqueId& dest, const llama::TUniqueId& src) { - dest.lo = src.lo; - dest.hi = src.hi; - return dest; -} - -bool operator==(const impala::TUniqueId& impala_id, const llama::TUniqueId& llama_id) { - return impala_id.lo == llama_id.lo && impala_id.hi == llama_id.hi; -} - -llama::TNetworkAddress& operator<<(llama::TNetworkAddress& dest, - const impala::TNetworkAddress& src) { - dest.hostname = src.hostname; - dest.port = src.port; - return dest; -} - -impala::TNetworkAddress& operator<<(impala::TNetworkAddress& dest, - const llama::TNetworkAddress& src) { - dest.hostname = src.hostname; - dest.port = src.port; - return dest; -} - -impala::Status LlamaStatusToImpalaStatus(const TStatus& status, - const string& err_prefix) { - if (status.status_code == TStatusCode::OK) return impala::Status::OK(); - stringstream ss; - ss << err_prefix << " " << join(status.error_msgs, ", "); - return impala::Status(ss.str()); -} - -string GetShortName(const string& user) { - if (user.empty() || user[0] == '/' || user[0] == '@') return user; - - vector<string> components; - split(components, user, is_any_of("/@")); - return components[0]; -} - -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/llama-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/llama-util.h b/be/src/util/llama-util.h deleted file mode 100644 index f6fc4ce..0000000 --- a/be/src/util/llama-util.h +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_UTIL_LLAMA_UTIL_H -#define IMPALA_UTIL_LLAMA_UTIL_H - -#include <ostream> -#include <string> -#include <boost/functional/hash.hpp> - -#include "gen-cpp/Types_types.h" // for TUniqueId -#include "gen-cpp/Llama_types.h" // for TUniqueId -#include "common/status.h" - -namespace llama { - -std::ostream& operator<<(std::ostream& os, const llama::TUniqueId& id); -std::ostream& operator<<(std::ostream& os, const llama::TNetworkAddress& address); -std::ostream& operator<<(std::ostream& os, const llama::TResource& resource); -std::ostream& operator<<(std::ostream& os, const llama::TAllocatedResource& resource); - -std::ostream& operator<<(std::ostream& os, - const llama::TLlamaAMGetNodesRequest& request); -std::ostream& operator<<(std::ostream& os, - const llama::TLlamaAMReservationRequest& request); -std::ostream& operator<<(std::ostream& os, - const llama::TLlamaAMReservationExpansionRequest& request); -std::ostream& operator<<(std::ostream& os, - const llama::TLlamaAMReleaseRequest& request); - -/// 'Assignment' operators to convert types between the llama and impala namespaces. -llama::TUniqueId& operator<<(llama::TUniqueId& dest, const impala::TUniqueId& src); -impala::TUniqueId& operator<<(impala::TUniqueId& dest, const llama::TUniqueId& src); - -std::string PrintId(const llama::TUniqueId& id, const std::string& separator = ":"); - -bool operator==(const impala::TUniqueId& impala_id, const llama::TUniqueId& llama_id); - -llama::TNetworkAddress& operator<<(llama::TNetworkAddress& dest, - const impala::TNetworkAddress& src); -impala::TNetworkAddress& operator<<(impala::TNetworkAddress& dest, - const llama::TNetworkAddress& src); - -impala::Status LlamaStatusToImpalaStatus(const llama::TStatus& status, - const std::string& err_prefix = ""); - -/// This function must be called 'hash_value' to be picked up by boost. -inline std::size_t hash_value(const llama::TUniqueId& id) { - std::size_t seed = 0; - boost::hash_combine(seed, id.lo); - boost::hash_combine(seed, id.hi); - return seed; -} - -/// Get the short version of the user name (the user's name up to the first '/' or '@') -/// If neither are found (or are found at the beginning of the user name) return username. -std::string GetShortName(const std::string& user); - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/thread-pool.h ---------------------------------------------------------------------- diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h index 8fc1bcc..ccc49c9 100644 --- a/be/src/util/thread-pool.h +++ b/be/src/util/thread-pool.h @@ -114,10 +114,6 @@ class ThreadPool { Join(); } - Status AssignToCgroup(const std::string& cgroup) { - return threads_.SetCgroup(cgroup); - } - private: /// Driver method for each thread in the pool. Continues to read work from the queue /// until the pool is shutdown. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/thread.cc ---------------------------------------------------------------------- diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc index 6a8b9b8..757ba59 100644 --- a/be/src/util/thread.cc +++ b/be/src/util/thread.cc @@ -26,7 +26,6 @@ #include "util/coding-util.h" #include "util/debug-util.h" #include "util/error-util.h" -#include "util/cgroups-mgr.h" #include "util/metrics.h" #include "util/webserver.h" #include "util/os-util.h" @@ -321,10 +320,6 @@ void Thread::SuperviseThread(const string& name, const string& category, Status ThreadGroup::AddThread(Thread* thread) { threads_.push_back(thread); - if (!cgroup_path_.empty()) { - DCHECK(cgroups_mgr_ != NULL); - RETURN_IF_ERROR(cgroups_mgr_->AssignThreadToCgroup(*thread, cgroup_path_)); - } return Status::OK(); } @@ -332,13 +327,4 @@ void ThreadGroup::JoinAll() { for (const Thread& thread: threads_) thread.Join(); } -Status ThreadGroup::SetCgroup(const string& cgroup) { - DCHECK(cgroups_mgr_ != NULL); - cgroup_path_ = cgroup; - for (const Thread& t: threads_) { - RETURN_IF_ERROR(cgroups_mgr_->AssignThreadToCgroup(t, cgroup)); - } - return Status::OK(); -} - } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/thread.h ---------------------------------------------------------------------- diff --git a/be/src/util/thread.h b/be/src/util/thread.h index 8c880d2..4e2b65d 100644 --- a/be/src/util/thread.h +++ b/be/src/util/thread.h @@ -31,7 +31,6 @@ namespace impala { class MetricGroup; class Webserver; -class CgroupsMgr; /// Thin wrapper around boost::thread that can register itself with the singleton /// ThreadMgr (a private class implemented in thread.cc entirely, which tracks all live @@ -165,39 +164,19 @@ class ThreadGroup { public: ThreadGroup() {} - ThreadGroup(CgroupsMgr* cgroups_mgr, const std::string& cgroup) - : cgroups_mgr_(cgroups_mgr), cgroup_path_(cgroup) { } - /// Adds a new Thread to this group. The ThreadGroup takes ownership of the Thread, and /// will destroy it when the ThreadGroup is destroyed. Threads will linger until that /// point (even if terminated), however, so callers should be mindful of the cost of /// placing very many threads in this set. - /// If cgroup_path_ / cgroup_prefix_ are set, the thread will be added to the specified - /// cgroup and an error will be returned if that operation fails. Status AddThread(Thread* thread); /// Waits for all threads to finish. DO NOT call this from a thread inside this set; /// deadlock will predictably ensue. void JoinAll(); - /// Assigns all current and future threads to the given cgroup managed by cgroups_mgr_. - /// Must be called after SetCgroupsMgr() if groups_mgr_ has not been set already. - /// Returns an error if any assignment was not possible, but does not undo previously - /// successful assignments. - Status SetCgroup(const std::string& cgroup); - - void SetCgroupsMgr(CgroupsMgr* cgroups_mgr) { cgroups_mgr_ = cgroups_mgr; } - private: /// All the threads grouped by this set. boost::ptr_vector<Thread> threads_; - - /// Cgroups manager for assigning threads in this group to cgroups. Not owned. - CgroupsMgr* cgroups_mgr_; - - /// If not empty, every thread added to this group will also be placed in the - /// cgroup_path_ managed by the cgroups_mgr_. - std::string cgroup_path_; }; /// Initialises the threading subsystem. Must be called before a Thread is created. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/uid-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h index c78a9ea..f0f87ec 100644 --- a/be/src/util/uid-util.h +++ b/be/src/util/uid-util.h @@ -37,10 +37,7 @@ inline std::size_t hash_value(const impala::TUniqueId& id) { return seed; } -/// Templated so that this method is not namespace-specific (since we also call this on -/// llama::TUniqueId) -template <typename T> -inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, T* unique_id) { +inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, TUniqueId* unique_id) { memcpy(&(unique_id->hi), &uuid.data[0], 8); memcpy(&(unique_id->lo), &uuid.data[8], 8); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/bootstrap_toolchain.py ---------------------------------------------------------------------- diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py index d8621c4..6524e82 100755 --- a/bin/bootstrap_toolchain.py +++ b/bin/bootstrap_toolchain.py @@ -22,7 +22,7 @@ # that we can deduce the version settings of the dependencies from the environment. # IMPALA_TOOLCHAIN indicates the location where the prebuilt artifacts should be extracted # to. If DOWNLOAD_CDH_COMPONENTS is set to true, this script will also download and extract -# the CDH components (i.e. Hadoop, Hive, HBase, Llama, Llama-minikdc and Sentry) into +# the CDH components (i.e. Hadoop, Hive, HBase and Sentry) into # CDH_COMPONENTS_HOME. # # The script is called as follows without any additional parameters: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/create-test-configuration.sh ---------------------------------------------------------------------- diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh index bbe8f61..8d695a4 100755 --- a/bin/create-test-configuration.sh +++ b/bin/create-test-configuration.sh @@ -148,7 +148,7 @@ if ${CLUSTER_DIR}/admin is_kerberized; then # strange, but making these symlinks also results in data loading # failures in the non-kerberized case. Without these, mapreduce # jobs die in a kerberized cluster because they can't find their - # kerberos principals. Obviously this has to be sorted out before + # kerberos principals. Obviously this has to be sorted out before # a kerberized cluster can load data. echo "Linking yarn and mapred from local cluster" ln -s ${CLUSTER_HADOOP_CONF_DIR}/yarn-site.xml http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/generate_minidump_collection_testdata.py ---------------------------------------------------------------------- diff --git a/bin/generate_minidump_collection_testdata.py b/bin/generate_minidump_collection_testdata.py index 27f9a42..a408e05 100755 --- a/bin/generate_minidump_collection_testdata.py +++ b/bin/generate_minidump_collection_testdata.py @@ -71,7 +71,6 @@ CONFIG_FILE = '''-beeswax_port=21000 -max_lineage_log_file_size=5000 -hostname=vb0204.halxg.cloudera.com -state_store_host=vb0202.halxg.cloudera.com --enable_rm=false -state_store_port=24000 -catalog_service_host=vb0202.halxg.cloudera.com -catalog_service_port=26000 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/start-impala-cluster.py ---------------------------------------------------------------------- diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index bfde4c3..b92fcf1 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -41,8 +41,6 @@ parser.add_option("--build_type", dest="build_type", default= 'latest', parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string", default=[], help="Additional arguments to pass to each Impalad during startup") -parser.add_option("--enable_rm", dest="enable_rm", action="store_true", default=False, - help="Enable resource management with Yarn and Llama.") parser.add_option("--state_store_args", dest="state_store_args", action="append", type="string", default=[], help="Additional arguments to pass to State Store during startup") @@ -91,8 +89,6 @@ IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d -be_port=%d " "-llama_callback_port=%d") JVM_ARGS = "-jvm_debug_port=%s -jvm_args=%s" BE_LOGGING_ARGS = "-log_filename=%s -log_dir=%s -v=%s -logbufsecs=5 -max_log_files=%s" -RM_ARGS = ("-enable_rm=true -llama_addresses=%s -cgroup_hierarchy_path=%s " - "-fair_scheduler_allocation_path=%s") CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240 # Kills have a timeout to prevent automated scripts from hanging indefinitely. # It is set to a high value to avoid failing if processes are slow to shut down. @@ -208,20 +204,6 @@ def build_jvm_args(instance_num): BASE_JVM_DEBUG_PORT = 30000 return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args) -def build_rm_args(instance_num): - if not options.enable_rm: return "" - try: - cgroup_path = cgroups.create_impala_cgroup_path(instance_num + 1) - except Exception, ex: - raise RuntimeError("Unable to initialize RM: %s" % str(ex)) - llama_address = "localhost:15000" - - # Don't bother checking if the path doesn't exist, the impalad won't start up - relative_fs_cfg_path = 'cdh%s/node-%d/etc/hadoop/conf/fair-scheduler.xml' %\ - (os.environ.get('CDH_MAJOR_VERSION'), instance_num + 1) - fs_cfg_path = os.path.join(os.environ.get('CLUSTER_DIR'), relative_fs_cfg_path) - return RM_ARGS % (llama_address, cgroup_path, fs_cfg_path) - def start_impalad_instances(cluster_size): if cluster_size == 0: # No impalad instances should be started. @@ -250,11 +232,10 @@ def start_impalad_instances(cluster_size): # impalad args from the --impalad_args flag. Also replacing '#ID' with the instance. param_args = (" ".join(options.impalad_args)).replace("#ID", str(i)) - args = "--mem_limit=%s %s %s %s %s %s" %\ + args = "--mem_limit=%s %s %s %s %s" %\ (mem_limit, # Goes first so --impalad_args will override it. build_impalad_logging_args(i, service_name), build_jvm_args(i), - build_impalad_port_args(i), param_args, - build_rm_args(i)) + build_impalad_port_args(i), param_args) stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name) exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/common/thrift/CMakeLists.txt b/common/thrift/CMakeLists.txt index e66bd89..3104ee2 100644 --- a/common/thrift/CMakeLists.txt +++ b/common/thrift/CMakeLists.txt @@ -162,7 +162,6 @@ set (SRC_FILES ImpalaService.thrift JniCatalog.thrift LineageGraph.thrift - Llama.thrift Logging.thrift NetworkTest.thrift MetricDefs.thrift @@ -171,7 +170,6 @@ set (SRC_FILES Planner.thrift Partitions.thrift parquet.thrift - ResourceBrokerService.thrift Results.thrift RuntimeProfile.thrift StatestoreService.thrift http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/Frontend.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift index 708cb46..732ea4a 100644 --- a/common/thrift/Frontend.thrift +++ b/common/thrift/Frontend.thrift @@ -387,6 +387,7 @@ struct TQueryExecRequest { // Estimated per-host CPU requirements in YARN virtual cores. // Used for resource management. + // TODO: Remove this and associated code in Planner. 11: optional i16 per_host_vcores // List of replica hosts. Used by the host_idx field of TScanRangeLocation. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index bf03d98..003a618 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -33,7 +33,6 @@ include "DataSinks.thrift" include "Results.thrift" include "RuntimeProfile.thrift" include "ImpalaService.thrift" -include "Llama.thrift" // constants for TQueryOptions.num_nodes const i32 NUM_NODES_ALL = 0 @@ -366,12 +365,6 @@ struct TPlanFragmentInstanceCtx { // Id of this fragment in its role as a sender. 11: optional i32 sender_id - - // Resource reservation to run this plan fragment in. - 12: optional Llama.TAllocatedResource reserved_resource - - // Address of local node manager (used for expanding resource allocations) - 13: optional Types.TNetworkAddress local_resource_address } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/Llama.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Llama.thrift b/common/thrift/Llama.thrift deleted file mode 100644 index a9b7f5f..0000000 --- a/common/thrift/Llama.thrift +++ /dev/null @@ -1,276 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -namespace cpp llama -namespace java com.cloudera.llama.thrift - -//////////////////////////////////////////////////////////////////////////////// -// DATA TYPES - -enum TLlamaServiceVersion { - V1 -} - -struct TUniqueId { - 1: required i64 hi; - 2: required i64 lo; -} - -struct TNetworkAddress { - 1: required string hostname; - 2: required i32 port; -} - -enum TStatusCode { - OK, - REQUEST_ERROR, - INTERNAL_ERROR -} - -struct TStatus { - 1: required TStatusCode status_code; - 2: i16 error_code; - 3: list<string> error_msgs; -} - -enum TLocationEnforcement { - MUST, - PREFERRED, - DONT_CARE -} - -struct TResource { - 1: required TUniqueId client_resource_id; - 2: required i16 v_cpu_cores; - 3: required i32 memory_mb; - 4: required string askedLocation; - 5: required TLocationEnforcement enforcement; -} - -struct TAllocatedResource { - 1: required TUniqueId reservation_id; - 2: required TUniqueId client_resource_id; - 3: required string rm_resource_id; - 4: required i16 v_cpu_cores; - 5: required i32 memory_mb; - 6: required string location; -} - -struct TNodeCapacity { - 1: required i16 total_v_cpu_cores; - 2: required i32 total_memory_mb; - 3: required i16 free_v_cpu_cores; - 4: required i32 free_memory_mb; -} - -//////////////////////////////////////////////////////////////////////////////// -// Llama AM Service - -struct TLlamaAMRegisterRequest { - 1: required TLlamaServiceVersion version; - 2: required TUniqueId client_id; - 3: required TNetworkAddress notification_callback_service; -} - -struct TLlamaAMRegisterResponse { - 1: required TStatus status; - 2: optional TUniqueId am_handle; -} - -struct TLlamaAMUnregisterRequest { - 1: required TLlamaServiceVersion version; - 2: required TUniqueId am_handle; -} - -struct TLlamaAMUnregisterResponse { - 1: required TStatus status; -} - -struct TLlamaAMReservationRequest { - 1: required TLlamaServiceVersion version; - 2: required TUniqueId am_handle; - 3: required string user; - 4: optional string queue; - 5: required list<TResource> resources; - 6: required bool gang; - 7: optional TUniqueId reservation_id; -} - -struct TLlamaAMReservationResponse { - 1: required TStatus status; - 2: optional TUniqueId reservation_id; -} - -struct TLlamaAMReservationExpansionRequest { - 1: required TLlamaServiceVersion version; - 2: required TUniqueId am_handle; - 3: required TUniqueId expansion_of; - 4: required TResource resource; - 5: optional TUniqueId expansion_id; -} - -struct TLlamaAMReservationExpansionResponse { - 1: required TStatus status; - 2: optional TUniqueId expansion_id; -} - -struct TLlamaAMReleaseRequest { - 1: required TLlamaServiceVersion version; - 2: required TUniqueId am_handle; - 3: required TUniqueId reservation_id; -} - -struct TLlamaAMReleaseResponse { - 1: required TStatus status; -} - -struct TLlamaAMGetNodesRequest { - 1: required TLlamaServiceVersion version; - 2: required TUniqueId am_handle; -} - -struct TLlamaAMGetNodesResponse { - 1: required TStatus status; - 2: optional list<string> nodes; -} - -service LlamaAMService { - - TLlamaAMRegisterResponse Register(1: TLlamaAMRegisterRequest request); - - TLlamaAMUnregisterResponse Unregister(1: TLlamaAMUnregisterRequest request); - - TLlamaAMReservationResponse Reserve(1: TLlamaAMReservationRequest request); - - TLlamaAMReservationExpansionResponse Expand( - 1: TLlamaAMReservationExpansionRequest request); - - TLlamaAMReleaseResponse Release(1: TLlamaAMReleaseRequest request); - - TLlamaAMGetNodesResponse GetNodes(1: TLlamaAMGetNodesRequest request); - -} - -//////////////////////////////////////////////////////////////////////////////// -// Llama AM Admin Service - -struct TLlamaAMAdminReleaseRequest { - 1: required TLlamaServiceVersion version; - 2: optional bool do_not_cache = false; - 3: optional list<string> queues; - 4: optional list<TUniqueId> handles; - 5: optional list<TUniqueId> reservations; -} - -struct TLlamaAMAdminReleaseResponse { - 1: required TStatus status; -} - -struct TLlamaAMAdminEmptyCacheRequest { - 1: required TLlamaServiceVersion version; - 2: optional bool allQueues = false; - 3: optional list<string> queues; -} - -struct TLlamaAMAdminEmptyCacheResponse { - 1: required TStatus status; -} - -service LlamaAMAdminService { - - TLlamaAMAdminReleaseResponse Release - (1: TLlamaAMAdminReleaseRequest request); - - TLlamaAMAdminEmptyCacheResponse EmptyCache - (1: TLlamaAMAdminEmptyCacheRequest request); - -} - -//////////////////////////////////////////////////////////////////////////////// -// Llama NM Service - -struct TLlamaNMRegisterRequest { - 1: required TLlamaServiceVersion version; - 2: required TUniqueId client_id; - 3: required TNetworkAddress notification_callback_service; -} - -struct TLlamaNMRegisterResponse { - 1: required TStatus status; - 2: optional TUniqueId nm_handle; -} - -struct TLlamaNMUnregisterRequest { - 1: required TLlamaServiceVersion version; - 2: required TUniqueId nm_handle; -} - -struct TLlamaNMUnregisterResponse { - 1: required TStatus status; -} - -service LlamaNMService { - - TLlamaNMRegisterResponse Register(1: TLlamaNMRegisterRequest request); - - TLlamaNMUnregisterResponse Unregister(1: TLlamaNMUnregisterRequest request); - -} - -//////////////////////////////////////////////////////////////////////////////// -// Llama Notification Callback Service - -struct TLlamaAMNotificationRequest { - 1: required TLlamaServiceVersion version; - 2: required TUniqueId am_handle; - 3: required bool heartbeat; - 4: optional list<TUniqueId> allocated_reservation_ids; - 5: optional list<TAllocatedResource> allocated_resources; - 6: optional list<TUniqueId> rejected_reservation_ids; - 7: optional list<TUniqueId> rejected_client_resource_ids; - 8: optional list<TUniqueId> lost_client_resource_ids; - 9: optional list<TUniqueId> preempted_reservation_ids; - 10: optional list<TUniqueId> preempted_client_resource_ids; - 11: optional list<TUniqueId> admin_released_reservation_ids; - 12: optional list<TUniqueId> lost_reservation_ids; -} - -struct TLlamaAMNotificationResponse { - 1: required TStatus status; -} - -struct TLlamaNMNotificationRequest { - 1: required TLlamaServiceVersion version; - 2: required TUniqueId nm_handle; - 3: required TNodeCapacity node_capacity; - 4: list<string> preempted_rm_resource_ids; -} - -struct TLlamaNMNotificationResponse { - 1: required TStatus status; -} - -service LlamaNotificationService { - - TLlamaAMNotificationResponse AMNotification( - 1: TLlamaAMNotificationRequest request); - - TLlamaNMNotificationResponse NMNotification( - 1: TLlamaNMNotificationRequest request); -} - -////////////////////////////////////////////////////////////////////////////////