IMPALA-4504: fix races in PlanFragmentExecutor regarding status reporting

The PlanFragmentExecutor has some state shared between the main
execution thread and the periodic reporting thread that isn't
synchronized properly.  IMPALA-4504 describes one such problem, and that
bug was introduced in an attempt to fix another similar race.

Let's just simplify all of this and remove this shared state.  Instead,
the profile thread will always be responsible for sending periodic
'!done' messages, and the main execution thread will always be
responsible for sending the final 'done' message (after joining the
periodic thread).

This will allow for even more simplification, in particular the
interaction between FragementExecState and PlanFragementExecutor, but
I'm not doing that now as to avoid more conflicts with the MT work.

Change-Id: I052b7b4fabb341ad27ad294cd5b0a53728d87d0e
Reviewed-on: http://gerrit.cloudera.org:8080/5250
Reviewed-by: Dan Hecht <dhe...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/56f4d0f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/56f4d0f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/56f4d0f5

Branch: refs/heads/master
Commit: 56f4d0f592f129054985786287ad8ed8c241fc74
Parents: a41918d
Author: Dan Hecht <dhe...@cloudera.com>
Authored: Mon Nov 28 15:38:12 2016 -0800
Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org>
Committed: Fri Dec 2 00:35:02 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/plan-fragment-executor.cc | 101 +++++++++-----------------
 be/src/runtime/plan-fragment-executor.h  |  65 +++++++----------
 be/src/service/fragment-exec-state.cc    |  18 ++---
 be/src/service/fragment-exec-state.h     |   6 +-
 4 files changed, 68 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc 
b/be/src/runtime/plan-fragment-executor.cc
index cfa4863..07dcfb0 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -76,8 +76,12 @@ PlanFragmentExecutor::PlanFragmentExecutor(
     report_thread_active_(false),
     closed_(false),
     has_thread_token_(false),
+    timings_profile_(NULL),
+    root_sink_(NULL),
     is_prepared_(false),
     is_cancelled_(false),
+    per_host_mem_usage_(NULL),
+    rows_produced_counter_(NULL),
     average_thread_tokens_(NULL),
     mem_usage_sampled_counter_(NULL),
     thread_usage_sampled_counter_(NULL) {}
@@ -91,6 +95,7 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
 Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
   Status status = PrepareInternal(request);
   prepared_promise_.Set(status);
+  if (!status.ok()) FragmentComplete(status);
   return status;
 }
 
@@ -287,11 +292,12 @@ void PlanFragmentExecutor::PrintVolumeIds(
 }
 
 Status PlanFragmentExecutor::Open() {
+  DCHECK(prepared_promise_.IsSet() && prepared_promise_.Get().ok());
   SCOPED_TIMER(profile()->total_time_counter());
   SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
   VLOG_QUERY << "Open(): instance_id=" << 
runtime_state_->fragment_instance_id();
   Status status = OpenInternal();
-  UpdateStatus(status);
+  if (!status.ok()) FragmentComplete(status);
   opened_promise_.Set(status);
   return status;
 }
@@ -301,17 +307,16 @@ Status PlanFragmentExecutor::OpenInternal() {
   RETURN_IF_ERROR(
       
runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
 
-  // we need to start the profile-reporting thread before calling 
exec_tree_->Open(),
-  // since it
-  // may block
+  // We need to start the profile-reporting thread before calling 
exec_tree_->Open(),
+  // since it may block.
   if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
     unique_lock<mutex> l(report_thread_lock_);
     report_thread_.reset(
         new Thread("plan-fragment-executor", "report-profile",
-            &PlanFragmentExecutor::ReportProfile, this));
-    // make sure the thread started up, otherwise ReportProfile() might get 
into a race
-    // with StopReportThread()
-    report_thread_started_cv_.wait(l);
+            &PlanFragmentExecutor::ReportProfileThread, this));
+    // Make sure the thread started up, otherwise ReportProfileThread() might 
get into
+    // a race with StopReportThread().
+    while (!report_thread_active_) report_thread_started_cv_.wait(l);
   }
 
   RETURN_IF_ERROR(OptimizeLlvmModule());
@@ -324,29 +329,22 @@ Status PlanFragmentExecutor::OpenInternal() {
 }
 
 Status PlanFragmentExecutor::Exec() {
+  DCHECK(opened_promise_.IsSet() && opened_promise_.Get().ok());
   SCOPED_TIMER(profile()->total_time_counter());
   Status status;
   {
     // Must go out of scope before FragmentComplete(), otherwise counter will 
not be
     // updated by time final profile is sent, and will always be 0.
     SCOPED_TIMER(ADD_TIMER(timings_profile_, EXEC_TIMER_NAME));
-    {
-      lock_guard<mutex> l(status_lock_);
-      RETURN_IF_ERROR(status_);
-    }
     status = ExecInternal();
   }
-
-  // If there's no error, ExecInternal() completed the fragment instance's 
execution.
-  if (status.ok()) {
-    FragmentComplete();
-  } else if (!status.IsCancelled() && !status.IsMemLimitExceeded()) {
-    // Log error message in addition to returning in Status. Queries that do 
not
-    // fetch results (e.g. insert) may not receive the message directly and can
-    // only retrieve the log.
+  if (!status.ok() && !status.IsCancelled() && !status.IsMemLimitExceeded()) {
+    // Log error message in addition to returning in Status. Queries that do 
not fetch
+    // results (e.g. insert) may not receive the message directly and can only 
retrieve
+    // the log.
     runtime_state_->LogError(status.msg());
   }
-  UpdateStatus(status);
+  FragmentComplete(status);
   return status;
 }
 
@@ -376,8 +374,9 @@ Status PlanFragmentExecutor::ExecInternal() {
   return Status::OK();
 }
 
-void PlanFragmentExecutor::ReportProfile() {
-  VLOG_FILE << "ReportProfile(): instance_id=" << 
runtime_state_->fragment_instance_id();
+void PlanFragmentExecutor::ReportProfileThread() {
+  VLOG_FILE << "ReportProfileThread(): instance_id="
+            << runtime_state_->fragment_instance_id();
   DCHECK(!report_status_cb_.empty());
   unique_lock<mutex> l(report_thread_lock_);
   // tell Open() that we started
@@ -414,40 +413,27 @@ void PlanFragmentExecutor::ReportProfile() {
     }
 
     if (!report_thread_active_) break;
-
-    if (completed_report_sent_.Load() == 0) {
-      // No complete fragment report has been sent.
-      SendReport(false);
-    }
+    SendReport(false, Status::OK());
   }
 
   VLOG_FILE << "exiting reporting thread: instance_id="
       << runtime_state_->fragment_instance_id();
 }
 
-void PlanFragmentExecutor::SendReport(bool done) {
+void PlanFragmentExecutor::SendReport(bool done, const Status& status) {
+  DCHECK(status.ok() || done);
   if (report_status_cb_.empty()) return;
 
-  Status status;
-  {
-    lock_guard<mutex> l(status_lock_);
-    status = status_;
-  }
-
-  // If status is not OK, we need to make sure that only one sender sends a 
'done'
-  // response.
-  // TODO: Clean all this up - move 'done' reporting to Close()?
-  if (!done && !status.ok()) {
-    done = completed_report_sent_.CompareAndSwap(0, 1);
-  }
-
   // Update the counter for the peak per host mem usage.
-  
per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
+  if (per_host_mem_usage_ != nullptr) {
+    
per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
+  }
 
   // This will send a report even if we are cancelled.  If the query completed 
correctly
   // but fragments still need to be cancelled (e.g. limit reached), the 
coordinator will
   // be waiting for a final report and profile.
-  report_status_cb_(status, profile(), done);
+  RuntimeProfile* prof = is_prepared_ ? profile() : nullptr;
+  report_status_cb_(status, prof, done);
 }
 
 void PlanFragmentExecutor::StopReportThread() {
@@ -460,29 +446,11 @@ void PlanFragmentExecutor::StopReportThread() {
   report_thread_->Join();
 }
 
-void PlanFragmentExecutor::FragmentComplete() {
-  // Check the atomic flag. If it is set, then a fragment complete report has 
already
-  // been sent.
-  bool send_report = completed_report_sent_.CompareAndSwap(0, 1);
+void PlanFragmentExecutor::FragmentComplete(const Status& status) {
   ReleaseThreadToken();
   StopReportThread();
-  if (send_report) SendReport(true);
-}
-
-void PlanFragmentExecutor::UpdateStatus(const Status& status) {
-  if (status.ok()) return;
-
-  bool send_report = completed_report_sent_.CompareAndSwap(0, 1);
-
-  {
-    lock_guard<mutex> l(status_lock_);
-    if (status_.ok()) {
-      status_ = status;
-    }
-  }
-
-  StopReportThread();
-  if (send_report) SendReport(true);
+  // It's safe to send final report now that the reporting thread is stopped.
+  SendReport(true, status);
 }
 
 void PlanFragmentExecutor::Cancel() {
@@ -520,6 +488,9 @@ void PlanFragmentExecutor::ReleaseThreadToken() {
 }
 
 void PlanFragmentExecutor::Close() {
+  DCHECK(!has_thread_token_);
+  DCHECK(!report_thread_active_);
+
   if (closed_) return;
   if (!is_prepared_) return;
   if (sink_.get() != nullptr) sink_->Close(runtime_state());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h 
b/be/src/runtime/plan-fragment-executor.h
index 22f2653..f93dab4 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -62,14 +62,12 @@ class TPlanExecParams;
 /// includes profile information for the plan itself as well as the output 
sink.
 ///
 /// The ReportStatusCallback passed into the c'tor is invoked periodically to 
report the
-/// execution status. The frequency of those reports is controlled by the flag
+/// execution profile. The frequency of those reports is controlled by the flag
 /// status_report_interval; setting that flag to 0 disables periodic reporting 
altogether
 /// Regardless of the value of that flag, if a report callback is specified, 
it is invoked
 /// at least once at the end of execution with an overall status and profile 
(and 'done'
-/// indicator). The only exception is when execution is cancelled, in which 
case the
-/// callback is *not* invoked (the coordinator already knows that execution 
stopped,
-/// because it initiated the cancellation).
-//
+/// indicator).
+///
 /// Aside from Cancel(), which may be called asynchronously, this class is not
 /// thread-safe.
 class PlanFragmentExecutor {
@@ -103,14 +101,19 @@ class PlanFragmentExecutor {
   ///
   /// If Cancel() is called before Prepare(), Prepare() is a no-op and returns
   /// Status::CANCELLED;
+  ///
+  /// If Prepare() fails, it will invoke final status callback with the error 
status.
   Status Prepare(const TExecPlanFragmentParams& request);
 
-  /// Opens the fragment plan and sink. Starts the profile reporting thread, 
if required.
+  /// Opens the fragment plan and sink. Starts the profile reporting thread, if
+  /// required.  Can be called only if Prepare() succeeded. If Open() fails it 
will
+  /// invoke the final status callback with the error status.
   Status Open();
 
   /// Executes the fragment by repeatedly driving the sink with batches 
produced by the
   /// exec node tree. report_status_cb will have been called for the final 
time when
-  /// Exec() returns, and the status-reporting thread will have been stopped.
+  /// Exec() returns, and the status-reporting thread will have been stopped. 
Can be
+  /// called only if Open() succeeded.
   Status Exec();
 
   /// Closes the underlying plan fragment and frees up all resources allocated 
in
@@ -169,7 +172,8 @@ class PlanFragmentExecutor {
 
   /// When the report thread starts, it sets 'report_thread_active_' to true 
and signals
   /// 'report_thread_started_cv_'. The report thread is shut down by setting
-  /// 'report_thread_active_' to false and signalling 'stop_report_thread_cv_'.
+  /// 'report_thread_active_' to false and signalling 
'stop_report_thread_cv_'. Protected
+  /// by 'report_thread_lock_'.
   bool report_thread_active_;
 
   /// true if Close() has been called
@@ -178,16 +182,6 @@ class PlanFragmentExecutor {
   /// true if this fragment has not returned the thread token to the thread 
resource mgr
   bool has_thread_token_;
 
-  /// Overall execution status. Either ok() or set to the first error status 
that
-  /// was encountered.
-  Status status_;
-
-  /// Protects status_
-  /// lock ordering:
-  /// 1. report_thread_lock_
-  /// 2. status_lock_
-  boost::mutex status_lock_;
-
   /// 'runtime_state_' has to be before 'sink_' as 'sink_' relies on the 
object pool of
   /// 'runtime_state_'. This means 'sink_' is destroyed first so any implicit 
connections
   /// (e.g. mem_trackers_) from 'runtime_state_' to 'sink_' need to be severed 
prior to
@@ -242,12 +236,6 @@ class PlanFragmentExecutor {
   /// of the execution.
   RuntimeProfile::Counter* average_thread_tokens_;
 
-  /// (Atomic) Flag that indicates whether a completed fragment report has 
been or will
-  /// be fired. It is initialized to 0 and atomically swapped to 1 when a 
completed
-  /// fragment report is about to be fired. Used for reducing the probability 
that a
-  /// report is sent twice at the end of the fragment.
-  AtomicInt32 completed_report_sent_;
-
   /// Sampled memory usage at even time intervals.
   RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_;
 
@@ -260,22 +248,19 @@ class PlanFragmentExecutor {
   typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> 
PerNodeScanRanges;
 
   /// Main loop of profile reporting thread.
-  /// Exits when notified on done_cv_.
-  /// On exit, *no report is sent*, ie, this will not send the final report.
-  void ReportProfile();
-
-  /// Invoked the report callback if there is a report callback and the current
-  /// status isn't CANCELLED. Sets 'done' to true in the callback invocation if
-  /// done == true or we have an error status.
-  void SendReport(bool done);
-
-  /// If status_.ok(), sets status_ to status.
-  /// If we're transitioning to an error status, stops report thread and
-  /// sends a final report.
-  void UpdateStatus(const Status& status);
-
-  /// Called when the fragment execution is complete to finalize counters.
-  void FragmentComplete();
+  /// Exits when notified on stop_report_thread_cv_ and report_thread_active_ 
is set to
+  /// false. This will not send the final report.
+  void ReportProfileThread();
+
+  /// Invoked the report callback. If 'done' is true, sends the final report 
with
+  /// 'status' and the profile. This type of report is sent once and only by 
the
+  /// instance execution thread.  Otherwise, a profile-only report is sent, 
which the
+  /// ReportProfileThread() thread will do periodically.
+  void SendReport(bool done, const Status& status);
+
+  /// Called when the fragment execution is complete to finalize counters and 
send
+  /// the final status report.  Must be called only once.
+  void FragmentComplete(const Status& status);
 
   /// Optimizes the code-generated functions in runtime_state_->llvm_codegen().
   /// Must be called after exec_tree_->Prepare() and before exec_tree_->Open().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc 
b/be/src/service/fragment-exec-state.cc
index cc56c19..337be82 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -46,25 +46,17 @@ Status FragmentMgr::FragmentExecState::Cancel() {
   return Status::OK();
 }
 
-Status FragmentMgr::FragmentExecState::Prepare() {
+void FragmentMgr::FragmentExecState::Exec() {
   Status status = executor_.Prepare(exec_params_);
-  if (!status.ok()) ReportStatusCb(status, NULL, true);
   prepare_promise_.Set(status);
-  return status;
-}
-
-void FragmentMgr::FragmentExecState::Exec() {
-  if (Prepare().ok()) {
-    executor_.Open();
-    executor_.Exec();
+  if (status.ok()) {
+    if (executor_.Open().ok()) {
+      executor_.Exec();
+    }
   }
   executor_.Close();
 }
 
-// There can only be one of these callbacks in-flight at any moment, because
-// it is only invoked from the executor's reporting thread.
-// Also, the reported status will always reflect the most recent execution 
status,
-// including the final status when execution finishes.
 void FragmentMgr::FragmentExecState::ReportStatusCb(
     const Status& status, RuntimeProfile* profile, bool done) {
   DCHECK(status.ok() || done);  // if !status.ok() => done

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/service/fragment-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.h 
b/be/src/service/fragment-exec-state.h
index c795cd8..1c64f2d 100644
--- a/be/src/service/fragment-exec-state.h
+++ b/be/src/service/fragment-exec-state.h
@@ -83,7 +83,7 @@ class FragmentMgr::FragmentExecState {
   /// if set to anything other than OK, execution has terminated w/ an error
   Status exec_status_;
 
-  /// Set once Prepare() has returned with exec_status_.
+  /// Barrier for the completion of executor_.Prepare().
   Promise<Status> prepare_promise_;
 
   /// Update 'exec_status_' w/ 'status', if the former is not already an error.
@@ -92,14 +92,12 @@ class FragmentMgr::FragmentExecState {
 
   /// Callback for executor; updates exec_status_ if 'status' indicates an 
error
   /// or if there was a thrift error.
-  ///
   /// If not NULL, `profile` is encoded as a Thrift structure and transmitted 
as part of
   /// the reporting RPC. `profile` may be NULL if a runtime profile has not 
been created
   /// for this fragment (e.g. when the fragment has failed during preparation).
+  /// The executor must ensure that there is only one invocation at a time.
   void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool 
done);
 
-  /// Call Prepare() and create and initialize data sink.
-  Status Prepare();
 };
 
 }

Reply via email to