IMPALA-4504: fix races in PlanFragmentExecutor regarding status reporting The PlanFragmentExecutor has some state shared between the main execution thread and the periodic reporting thread that isn't synchronized properly. IMPALA-4504 describes one such problem, and that bug was introduced in an attempt to fix another similar race.
Let's just simplify all of this and remove this shared state. Instead, the profile thread will always be responsible for sending periodic '!done' messages, and the main execution thread will always be responsible for sending the final 'done' message (after joining the periodic thread). This will allow for even more simplification, in particular the interaction between FragementExecState and PlanFragementExecutor, but I'm not doing that now as to avoid more conflicts with the MT work. Change-Id: I052b7b4fabb341ad27ad294cd5b0a53728d87d0e Reviewed-on: http://gerrit.cloudera.org:8080/5250 Reviewed-by: Dan Hecht <dhe...@cloudera.com> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/56f4d0f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/56f4d0f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/56f4d0f5 Branch: refs/heads/master Commit: 56f4d0f592f129054985786287ad8ed8c241fc74 Parents: a41918d Author: Dan Hecht <dhe...@cloudera.com> Authored: Mon Nov 28 15:38:12 2016 -0800 Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org> Committed: Fri Dec 2 00:35:02 2016 +0000 ---------------------------------------------------------------------- be/src/runtime/plan-fragment-executor.cc | 101 +++++++++----------------- be/src/runtime/plan-fragment-executor.h | 65 +++++++---------- be/src/service/fragment-exec-state.cc | 18 ++--- be/src/service/fragment-exec-state.h | 6 +- 4 files changed, 68 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index cfa4863..07dcfb0 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -76,8 +76,12 @@ PlanFragmentExecutor::PlanFragmentExecutor( report_thread_active_(false), closed_(false), has_thread_token_(false), + timings_profile_(NULL), + root_sink_(NULL), is_prepared_(false), is_cancelled_(false), + per_host_mem_usage_(NULL), + rows_produced_counter_(NULL), average_thread_tokens_(NULL), mem_usage_sampled_counter_(NULL), thread_usage_sampled_counter_(NULL) {} @@ -91,6 +95,7 @@ PlanFragmentExecutor::~PlanFragmentExecutor() { Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { Status status = PrepareInternal(request); prepared_promise_.Set(status); + if (!status.ok()) FragmentComplete(status); return status; } @@ -287,11 +292,12 @@ void PlanFragmentExecutor::PrintVolumeIds( } Status PlanFragmentExecutor::Open() { + DCHECK(prepared_promise_.IsSet() && prepared_promise_.Get().ok()); SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME)); VLOG_QUERY << "Open(): instance_id=" << runtime_state_->fragment_instance_id(); Status status = OpenInternal(); - UpdateStatus(status); + if (!status.ok()) FragmentComplete(status); opened_promise_.Set(status); return status; } @@ -301,17 +307,16 @@ Status PlanFragmentExecutor::OpenInternal() { RETURN_IF_ERROR( runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get())); - // we need to start the profile-reporting thread before calling exec_tree_->Open(), - // since it - // may block + // We need to start the profile-reporting thread before calling exec_tree_->Open(), + // since it may block. if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) { unique_lock<mutex> l(report_thread_lock_); report_thread_.reset( new Thread("plan-fragment-executor", "report-profile", - &PlanFragmentExecutor::ReportProfile, this)); - // make sure the thread started up, otherwise ReportProfile() might get into a race - // with StopReportThread() - report_thread_started_cv_.wait(l); + &PlanFragmentExecutor::ReportProfileThread, this)); + // Make sure the thread started up, otherwise ReportProfileThread() might get into + // a race with StopReportThread(). + while (!report_thread_active_) report_thread_started_cv_.wait(l); } RETURN_IF_ERROR(OptimizeLlvmModule()); @@ -324,29 +329,22 @@ Status PlanFragmentExecutor::OpenInternal() { } Status PlanFragmentExecutor::Exec() { + DCHECK(opened_promise_.IsSet() && opened_promise_.Get().ok()); SCOPED_TIMER(profile()->total_time_counter()); Status status; { // Must go out of scope before FragmentComplete(), otherwise counter will not be // updated by time final profile is sent, and will always be 0. SCOPED_TIMER(ADD_TIMER(timings_profile_, EXEC_TIMER_NAME)); - { - lock_guard<mutex> l(status_lock_); - RETURN_IF_ERROR(status_); - } status = ExecInternal(); } - - // If there's no error, ExecInternal() completed the fragment instance's execution. - if (status.ok()) { - FragmentComplete(); - } else if (!status.IsCancelled() && !status.IsMemLimitExceeded()) { - // Log error message in addition to returning in Status. Queries that do not - // fetch results (e.g. insert) may not receive the message directly and can - // only retrieve the log. + if (!status.ok() && !status.IsCancelled() && !status.IsMemLimitExceeded()) { + // Log error message in addition to returning in Status. Queries that do not fetch + // results (e.g. insert) may not receive the message directly and can only retrieve + // the log. runtime_state_->LogError(status.msg()); } - UpdateStatus(status); + FragmentComplete(status); return status; } @@ -376,8 +374,9 @@ Status PlanFragmentExecutor::ExecInternal() { return Status::OK(); } -void PlanFragmentExecutor::ReportProfile() { - VLOG_FILE << "ReportProfile(): instance_id=" << runtime_state_->fragment_instance_id(); +void PlanFragmentExecutor::ReportProfileThread() { + VLOG_FILE << "ReportProfileThread(): instance_id=" + << runtime_state_->fragment_instance_id(); DCHECK(!report_status_cb_.empty()); unique_lock<mutex> l(report_thread_lock_); // tell Open() that we started @@ -414,40 +413,27 @@ void PlanFragmentExecutor::ReportProfile() { } if (!report_thread_active_) break; - - if (completed_report_sent_.Load() == 0) { - // No complete fragment report has been sent. - SendReport(false); - } + SendReport(false, Status::OK()); } VLOG_FILE << "exiting reporting thread: instance_id=" << runtime_state_->fragment_instance_id(); } -void PlanFragmentExecutor::SendReport(bool done) { +void PlanFragmentExecutor::SendReport(bool done, const Status& status) { + DCHECK(status.ok() || done); if (report_status_cb_.empty()) return; - Status status; - { - lock_guard<mutex> l(status_lock_); - status = status_; - } - - // If status is not OK, we need to make sure that only one sender sends a 'done' - // response. - // TODO: Clean all this up - move 'done' reporting to Close()? - if (!done && !status.ok()) { - done = completed_report_sent_.CompareAndSwap(0, 1); - } - // Update the counter for the peak per host mem usage. - per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption()); + if (per_host_mem_usage_ != nullptr) { + per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption()); + } // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. - report_status_cb_(status, profile(), done); + RuntimeProfile* prof = is_prepared_ ? profile() : nullptr; + report_status_cb_(status, prof, done); } void PlanFragmentExecutor::StopReportThread() { @@ -460,29 +446,11 @@ void PlanFragmentExecutor::StopReportThread() { report_thread_->Join(); } -void PlanFragmentExecutor::FragmentComplete() { - // Check the atomic flag. If it is set, then a fragment complete report has already - // been sent. - bool send_report = completed_report_sent_.CompareAndSwap(0, 1); +void PlanFragmentExecutor::FragmentComplete(const Status& status) { ReleaseThreadToken(); StopReportThread(); - if (send_report) SendReport(true); -} - -void PlanFragmentExecutor::UpdateStatus(const Status& status) { - if (status.ok()) return; - - bool send_report = completed_report_sent_.CompareAndSwap(0, 1); - - { - lock_guard<mutex> l(status_lock_); - if (status_.ok()) { - status_ = status; - } - } - - StopReportThread(); - if (send_report) SendReport(true); + // It's safe to send final report now that the reporting thread is stopped. + SendReport(true, status); } void PlanFragmentExecutor::Cancel() { @@ -520,6 +488,9 @@ void PlanFragmentExecutor::ReleaseThreadToken() { } void PlanFragmentExecutor::Close() { + DCHECK(!has_thread_token_); + DCHECK(!report_thread_active_); + if (closed_) return; if (!is_prepared_) return; if (sink_.get() != nullptr) sink_->Close(runtime_state()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/runtime/plan-fragment-executor.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h index 22f2653..f93dab4 100644 --- a/be/src/runtime/plan-fragment-executor.h +++ b/be/src/runtime/plan-fragment-executor.h @@ -62,14 +62,12 @@ class TPlanExecParams; /// includes profile information for the plan itself as well as the output sink. /// /// The ReportStatusCallback passed into the c'tor is invoked periodically to report the -/// execution status. The frequency of those reports is controlled by the flag +/// execution profile. The frequency of those reports is controlled by the flag /// status_report_interval; setting that flag to 0 disables periodic reporting altogether /// Regardless of the value of that flag, if a report callback is specified, it is invoked /// at least once at the end of execution with an overall status and profile (and 'done' -/// indicator). The only exception is when execution is cancelled, in which case the -/// callback is *not* invoked (the coordinator already knows that execution stopped, -/// because it initiated the cancellation). -// +/// indicator). +/// /// Aside from Cancel(), which may be called asynchronously, this class is not /// thread-safe. class PlanFragmentExecutor { @@ -103,14 +101,19 @@ class PlanFragmentExecutor { /// /// If Cancel() is called before Prepare(), Prepare() is a no-op and returns /// Status::CANCELLED; + /// + /// If Prepare() fails, it will invoke final status callback with the error status. Status Prepare(const TExecPlanFragmentParams& request); - /// Opens the fragment plan and sink. Starts the profile reporting thread, if required. + /// Opens the fragment plan and sink. Starts the profile reporting thread, if + /// required. Can be called only if Prepare() succeeded. If Open() fails it will + /// invoke the final status callback with the error status. Status Open(); /// Executes the fragment by repeatedly driving the sink with batches produced by the /// exec node tree. report_status_cb will have been called for the final time when - /// Exec() returns, and the status-reporting thread will have been stopped. + /// Exec() returns, and the status-reporting thread will have been stopped. Can be + /// called only if Open() succeeded. Status Exec(); /// Closes the underlying plan fragment and frees up all resources allocated in @@ -169,7 +172,8 @@ class PlanFragmentExecutor { /// When the report thread starts, it sets 'report_thread_active_' to true and signals /// 'report_thread_started_cv_'. The report thread is shut down by setting - /// 'report_thread_active_' to false and signalling 'stop_report_thread_cv_'. + /// 'report_thread_active_' to false and signalling 'stop_report_thread_cv_'. Protected + /// by 'report_thread_lock_'. bool report_thread_active_; /// true if Close() has been called @@ -178,16 +182,6 @@ class PlanFragmentExecutor { /// true if this fragment has not returned the thread token to the thread resource mgr bool has_thread_token_; - /// Overall execution status. Either ok() or set to the first error status that - /// was encountered. - Status status_; - - /// Protects status_ - /// lock ordering: - /// 1. report_thread_lock_ - /// 2. status_lock_ - boost::mutex status_lock_; - /// 'runtime_state_' has to be before 'sink_' as 'sink_' relies on the object pool of /// 'runtime_state_'. This means 'sink_' is destroyed first so any implicit connections /// (e.g. mem_trackers_) from 'runtime_state_' to 'sink_' need to be severed prior to @@ -242,12 +236,6 @@ class PlanFragmentExecutor { /// of the execution. RuntimeProfile::Counter* average_thread_tokens_; - /// (Atomic) Flag that indicates whether a completed fragment report has been or will - /// be fired. It is initialized to 0 and atomically swapped to 1 when a completed - /// fragment report is about to be fired. Used for reducing the probability that a - /// report is sent twice at the end of the fragment. - AtomicInt32 completed_report_sent_; - /// Sampled memory usage at even time intervals. RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_; @@ -260,22 +248,19 @@ class PlanFragmentExecutor { typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges; /// Main loop of profile reporting thread. - /// Exits when notified on done_cv_. - /// On exit, *no report is sent*, ie, this will not send the final report. - void ReportProfile(); - - /// Invoked the report callback if there is a report callback and the current - /// status isn't CANCELLED. Sets 'done' to true in the callback invocation if - /// done == true or we have an error status. - void SendReport(bool done); - - /// If status_.ok(), sets status_ to status. - /// If we're transitioning to an error status, stops report thread and - /// sends a final report. - void UpdateStatus(const Status& status); - - /// Called when the fragment execution is complete to finalize counters. - void FragmentComplete(); + /// Exits when notified on stop_report_thread_cv_ and report_thread_active_ is set to + /// false. This will not send the final report. + void ReportProfileThread(); + + /// Invoked the report callback. If 'done' is true, sends the final report with + /// 'status' and the profile. This type of report is sent once and only by the + /// instance execution thread. Otherwise, a profile-only report is sent, which the + /// ReportProfileThread() thread will do periodically. + void SendReport(bool done, const Status& status); + + /// Called when the fragment execution is complete to finalize counters and send + /// the final status report. Must be called only once. + void FragmentComplete(const Status& status); /// Optimizes the code-generated functions in runtime_state_->llvm_codegen(). /// Must be called after exec_tree_->Prepare() and before exec_tree_->Open(). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/service/fragment-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc index cc56c19..337be82 100644 --- a/be/src/service/fragment-exec-state.cc +++ b/be/src/service/fragment-exec-state.cc @@ -46,25 +46,17 @@ Status FragmentMgr::FragmentExecState::Cancel() { return Status::OK(); } -Status FragmentMgr::FragmentExecState::Prepare() { +void FragmentMgr::FragmentExecState::Exec() { Status status = executor_.Prepare(exec_params_); - if (!status.ok()) ReportStatusCb(status, NULL, true); prepare_promise_.Set(status); - return status; -} - -void FragmentMgr::FragmentExecState::Exec() { - if (Prepare().ok()) { - executor_.Open(); - executor_.Exec(); + if (status.ok()) { + if (executor_.Open().ok()) { + executor_.Exec(); + } } executor_.Close(); } -// There can only be one of these callbacks in-flight at any moment, because -// it is only invoked from the executor's reporting thread. -// Also, the reported status will always reflect the most recent execution status, -// including the final status when execution finishes. void FragmentMgr::FragmentExecState::ReportStatusCb( const Status& status, RuntimeProfile* profile, bool done) { DCHECK(status.ok() || done); // if !status.ok() => done http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/service/fragment-exec-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-exec-state.h b/be/src/service/fragment-exec-state.h index c795cd8..1c64f2d 100644 --- a/be/src/service/fragment-exec-state.h +++ b/be/src/service/fragment-exec-state.h @@ -83,7 +83,7 @@ class FragmentMgr::FragmentExecState { /// if set to anything other than OK, execution has terminated w/ an error Status exec_status_; - /// Set once Prepare() has returned with exec_status_. + /// Barrier for the completion of executor_.Prepare(). Promise<Status> prepare_promise_; /// Update 'exec_status_' w/ 'status', if the former is not already an error. @@ -92,14 +92,12 @@ class FragmentMgr::FragmentExecState { /// Callback for executor; updates exec_status_ if 'status' indicates an error /// or if there was a thrift error. - /// /// If not NULL, `profile` is encoded as a Thrift structure and transmitted as part of /// the reporting RPC. `profile` may be NULL if a runtime profile has not been created /// for this fragment (e.g. when the fragment has failed during preparation). + /// The executor must ensure that there is only one invocation at a time. void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool done); - /// Call Prepare() and create and initialize data sink. - Status Prepare(); }; }