IMPALA-6609: Fix ownership of class members in KrpcDataStreamRecvr

A KrpcDataStreamRecvr is co-owned by the singleton KrpcDataStreamMgr
and an exchange node. It's possible that some threads (e.g. RPC service
threads) may still retain reference to the KrpcDataStreamRecvr after its
owning exchange node has been closed. This is problematic as some members
in the receiver (e.g. sender/receiver profiles) are actually owned by the
exchange node so accessing them after the exchange node is closed and
possibly deleted may lead to user-after-free.

This patch changes the ownership of some members in KrpcDataStreamRecvr
to the receiver. In particular, the profiles are now owned by the receiver
and various stat counters and timers are in turn owned by these profiles.
This prevents the use-after-free problem described above. This patch also
moves the access to deferred_rpc_tracker_ in TakeOverEarlySenders() to be
under the sender queue's 'lock_' to prevent another instance of IMPALA-6554.

Testing done: core debug build.

Change-Id: I3378496e2201b16c662b9daa634333480705c61a
Reviewed-on: http://gerrit.cloudera.org:8080/9527
Reviewed-by: Dan Hecht <dhe...@cloudera.com>
Reviewed-by: Sailesh Mukil <sail...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/6f430bd7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/6f430bd7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/6f430bd7

Branch: refs/heads/2.x
Commit: 6f430bd74caca0e2a35a60bb1098a7d3e3aab20c
Parents: e7fa31e
Author: Michael Ho <k...@cloudera.com>
Authored: Mon Mar 5 19:15:55 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Tue Mar 13 01:52:24 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/fragment-instance-state.cc |  1 +
 be/src/runtime/fragment-instance-state.h  | 13 ++++++
 be/src/runtime/krpc-data-stream-recvr.cc  | 54 ++++++++++++++++++-----
 be/src/runtime/krpc-data-stream-recvr.h   | 61 +++++++++++++++++++-------
 be/src/runtime/query-state.cc             |  5 ++-
 5 files changed, 104 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/6f430bd7/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index c5c57a3..acbce84 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -55,6 +55,7 @@ using namespace apache::thrift;
 
 const string FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER = 
"PerHostPeakMemUsage";
 const string FragmentInstanceState::FINST_THREAD_GROUP_NAME = 
"fragment-execution";
+const string FragmentInstanceState::FINST_THREAD_NAME_PREFIX = 
"exec-finstance";
 
 static const string OPEN_TIMER_NAME = "OpenTime";
 static const string PREPARE_TIMER_NAME = "PrepareTime";

http://git-wip-us.apache.org/repos/asf/impala/blob/6f430bd7/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h 
b/be/src/runtime/fragment-instance-state.h
index 292b93c..8295c8f 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -25,6 +25,7 @@
 
 #include "common/atomic.h"
 #include "common/status.h"
+#include "common/thread-debug-info.h"
 #include "util/promise.h"
 
 #include "gen-cpp/ImpalaInternalService_types.h"
@@ -127,7 +128,19 @@ class FragmentInstanceState {
   const TNetworkAddress& coord_address() const { return 
query_ctx().coord_address; }
   ObjectPool* obj_pool();
 
+  /// Returns true if the current thread is a thread executing the whole or 
part of
+  /// a fragment instance.
+  static bool IsFragmentExecThread() {
+    const static size_t name_len =
+        strlen(FragmentInstanceState::FINST_THREAD_NAME_PREFIX.c_str());
+    const char* name = GetThreadDebugInfo()->GetThreadName();
+    return name != nullptr &&
+        (strncmp(name, FINST_THREAD_NAME_PREFIX.c_str(), name_len) == 0 ||
+         strncmp(name, "join-build-thread", 17) == 0);
+  }
+
   static const std::string FINST_THREAD_GROUP_NAME;
+  static const std::string FINST_THREAD_NAME_PREFIX;
 
  private:
   QueryState* query_state_;

http://git-wip-us.apache.org/repos/asf/impala/blob/6f430bd7/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc 
b/be/src/runtime/krpc-data-stream-recvr.cc
index 85563bf..c7126d4 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -25,6 +25,7 @@
 
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_context.h"
+#include "runtime/fragment-instance-state.h"
 #include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/mem-tracker.h"
@@ -33,6 +34,7 @@
 #include "service/data-stream-service.h"
 #include "util/runtime-profile-counters.h"
 #include "util/periodic-counter-updater.h"
+#include "util/test-info.h"
 
 #include "gen-cpp/data_stream_service.pb.h"
 
@@ -189,6 +191,8 @@ KrpcDataStreamRecvr::SenderQueue::SenderQueue(
 
 Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
   SCOPED_TIMER(recvr_->queue_get_batch_time_);
+  DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
+  DCHECK(!recvr_->closed_);
   int num_to_dequeue = 0;
   // The sender id is set below when we decide to dequeue entries from 
'deferred_rpcs_'.
   int sender_id = -1;
@@ -315,7 +319,7 @@ Status 
KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
     // handle deleting any unconsumed batches from batch_queue_. Close() 
cannot proceed
     // until there are no pending insertion to batch_queue_.
     status = RowBatch::FromProtobuf(recvr_->row_desc(), header, tuple_offsets, 
tuple_data,
-        recvr_->parent_tracker(), recvr_->client(), &batch);
+        recvr_->parent_tracker(), recvr_->buffer_pool_client(), &batch);
   }
   lock->lock();
 
@@ -394,8 +398,8 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() 
{
     DCHECK_GT(num_deserialize_tasks_pending_, 0);
     --num_deserialize_tasks_pending_;
 
-    // Returns if the queue is empty. The queue may be drained in Cancel().
     if (deferred_rpcs_.empty()) return;
+    // A sender queue cannot be cancelled if there is any deferred RPC.
     DCHECK(!is_cancelled_);
 
     // Try enqueuing the first entry into 'batch_queue_'.
@@ -439,16 +443,22 @@ void 
KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
 
 void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
     unique_ptr<TransmitDataCtx> ctx) {
+  // TakeOverEarlySender() is called by the same thread which calls Close().
+  // The receiver cannot be closed while this function is in progress so
+  // 'recvr_->mgr_' shouldn't be NULL.
+  DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
+  DCHECK(!recvr_->closed_ && recvr_->mgr_ != nullptr);
   int sender_id = ctx->request->sender_id();
-  recvr_->deferred_rpc_tracker()->Consume(ctx->rpc_context->GetTransferSize());
   COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   {
     lock_guard<SpinLock> l(lock_);
     if (UNLIKELY(is_cancelled_)) {
-      DataStreamService::RespondAndReleaseRpc(Status::OK(), ctx->response,
-          ctx->rpc_context, recvr_->deferred_rpc_tracker());
+      Status::OK().ToProto(ctx->response->mutable_status());
+      ctx->rpc_context->RespondSuccess();
       return;
     }
+    // Only enqueue a deferred RPC if the sender queue is not yet cancelled.
+    
recvr_->deferred_rpc_tracker()->Consume(ctx->rpc_context->GetTransferSize());
     deferred_rpcs_.push(move(ctx));
     ++num_deserialize_tasks_pending_;
   }
@@ -511,6 +521,7 @@ void KrpcDataStreamRecvr::SenderQueue::Close() {
 
 Status KrpcDataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) {
   DCHECK(is_merging_);
+  DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
   vector<SortedRunMerger::RunBatchSupplierFn> input_batch_suppliers;
   input_batch_suppliers.reserve(sender_queues_.size());
 
@@ -529,6 +540,7 @@ Status KrpcDataStreamRecvr::CreateMerger(const 
TupleRowComparator& less_than) {
 }
 
 void KrpcDataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
+  DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
   for (SenderQueue* sender_queue: sender_queues_) {
     if (sender_queue->current_batch() != nullptr) {
       sender_queue->current_batch()->TransferResourceOwnership(transfer_batch);
@@ -547,23 +559,28 @@ 
KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
     total_buffer_limit_(total_buffer_limit),
     row_desc_(row_desc),
     is_merging_(is_merging),
+    closed_(false),
     num_buffered_bytes_(0),
     deferred_rpc_tracker_(new MemTracker(-1, "KrpcDeferredRpcs", 
parent_tracker)),
     parent_tracker_(parent_tracker),
-    client_(client),
+    buffer_pool_client_(client),
     profile_(profile),
-    recvr_side_profile_(profile_->CreateChild("RecvrSide")),
-    sender_side_profile_(profile_->CreateChild("SenderSide")) {
+    recvr_side_profile_(RuntimeProfile::Create(&pool_, "RecvrSide")),
+    sender_side_profile_(RuntimeProfile::Create(&pool_, "SenderSide")) {
   // Create one queue per sender if is_merging is true.
   int num_queues = is_merging ? num_senders : 1;
   sender_queues_.reserve(num_queues);
   int num_sender_per_queue = is_merging ? 1 : num_senders;
   for (int i = 0; i < num_queues; ++i) {
-    SenderQueue* queue =
-        sender_queue_pool_.Add(new SenderQueue(this, num_sender_per_queue));
+    SenderQueue* queue = pool_.Add(new SenderQueue(this, 
num_sender_per_queue));
     sender_queues_.push_back(queue);
   }
 
+  // Add the receiver and sender sides' profiles as children of the owning 
exchange
+  // node's profile.
+  profile_->AddChild(recvr_side_profile_);
+  profile_->AddChild(sender_side_profile_);
+
   // Initialize the counters
   bytes_received_counter_ =
       ADD_COUNTER(recvr_side_profile_, "TotalBytesReceived", TUnit::BYTES);
@@ -572,11 +589,11 @@ 
KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
   queue_get_batch_time_ = ADD_TIMER(recvr_side_profile_, "TotalGetBatchTime");
   data_arrival_timer_ =
       ADD_CHILD_TIMER(recvr_side_profile_, "DataArrivalTimer", 
"TotalGetBatchTime");
+  inactive_timer_ = profile_->inactive_timer();
   first_batch_wait_total_timer_ =
       ADD_TIMER(recvr_side_profile_, "FirstBatchArrivalWaitTime");
   deserialize_row_batch_timer_ =
       ADD_TIMER(sender_side_profile_, "DeserializeRowBatchTime");
-  inactive_timer_ = profile_->inactive_timer();
   num_early_senders_ =
       ADD_COUNTER(sender_side_profile_, "NumEarlySenders", TUnit::UNIT);
   num_arrived_batches_ =
@@ -592,6 +609,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* 
stream_mgr,
 }
 
 Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
+  DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
   DCHECK(merger_.get() != nullptr);
   return merger_->GetNext(output_batch, eos);
 }
@@ -627,17 +645,29 @@ void KrpcDataStreamRecvr::CancelStream() {
 }
 
 void KrpcDataStreamRecvr::Close() {
+  DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
+  DCHECK(!closed_);
+  closed_ = true;
   // Remove this receiver from the KrpcDataStreamMgr that created it.
   // All the sender queues will be cancelled after this call returns.
   const Status status = mgr_->DeregisterRecvr(fragment_instance_id(), 
dest_node_id());
   if (!status.ok()) {
     LOG(ERROR) << "Error deregistering receiver: " << status.GetDetail();
   }
-  mgr_ = nullptr;
   for (auto& queue: sender_queues_) queue->Close();
   merger_.reset();
+
+  // Given all queues have been cancelled and closed already at this point, 
it's safe to
+  // call Close() on 'deferred_rpc_tracker_' without holding any lock here.
   deferred_rpc_tracker_->Close();
   recvr_side_profile_->StopPeriodicCounters();
+
+  // Remove reference to the unowned resources which may be freed after 
Close().
+  mgr_ = nullptr;
+  row_desc_ = nullptr;
+  parent_tracker_ = nullptr;
+  buffer_pool_client_ = nullptr;
+  profile_ = nullptr;
 }
 
 KrpcDataStreamRecvr::~KrpcDataStreamRecvr() {

http://git-wip-us.apache.org/repos/asf/impala/blob/6f430bd7/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h 
b/be/src/runtime/krpc-data-stream-recvr.h
index c07e0ec..845dbf5 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -71,6 +71,19 @@ class TransmitDataResponsePB;
 ///
 /// KrpcDataStreamRecvr::Close() must be called by the caller of CreateRecvr() 
to remove
 /// the recvr instance from the tracking structure of its KrpcDataStreamMgr in 
all cases.
+///
+/// Unless otherwise stated, class members belong to KrpcDataStreamRecvr. They 
are safe to
+/// access from any threads as long as the caller obtained a shared_ptr to 
keep the
+/// receiver alive. For class members not owned by the receiver, they must 
stay valid till
+/// after Close() is called. Since a receiver is co-owned by an exchange node 
and the
+/// singleton KrpcDataStreamMgr, it's possible that certain threads may race 
with Close()
+/// called from the fragment execution thread. A receiver may also be 
cancelled at any
+/// time due to query cancellation. To avoid resource leak, the following 
protocol is
+/// followed:
+/// - callers must obtain the target sender queue's lock and check if it's 
cancelled
+/// - no new row batch or deferred RPCs should be added to a cancelled sender 
queue
+/// - Cancel() will drain the deferred RPCs queue and the row batch queue
+///
 class KrpcDataStreamRecvr : public DataStreamRecvrBase {
  public:
   ~KrpcDataStreamRecvr();
@@ -79,24 +92,28 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// Retains ownership of the returned batch. The caller must call 
TransferAllResources()
   /// to acquire the resources from the returned batch before the next call to 
GetBatch().
   /// A NULL returned batch indicated eos. Must only be called if is_merging_ 
is false.
+  /// Called from fragment instance execution threads only.
   /// TODO: This is currently only exposed to the non-merging version of the 
exchange.
   /// Refactor so both merging and non-merging exchange use GetNext(RowBatch*, 
bool* eos).
   Status GetBatch(RowBatch** next_batch);
 
   /// Deregister from KrpcDataStreamMgr instance, which shares ownership of 
this instance.
+  /// Called from fragment instance execution threads only.
   void Close();
 
   /// Create a SortedRunMerger instance to merge rows from multiple sender 
according to
   /// the specified row comparator. Fetches the first batches from the 
individual sender
   /// queues. The exprs used in less_than must have already been prepared and 
opened.
+  /// Called from fragment instance execution threads only.
   Status CreateMerger(const TupleRowComparator& less_than);
 
   /// Fill output_batch with the next batch of rows obtained by merging the 
per-sender
-  /// input streams. Must only be called if is_merging_ is true.
+  /// input streams. Must only be called if is_merging_ is true. Called from 
fragment
+  /// instance execution threads only.
   Status GetNext(RowBatch* output_batch, bool* eos);
 
   /// Transfer all resources from the current batches being processed from 
each sender
-  /// queue to the specified batch.
+  /// queue to the specified batch. Called from fragment instance execution 
threads only.
   void TransferAllResources(RowBatch* transfer_batch);
 
   const TUniqueId& fragment_instance_id() const { return 
fragment_instance_id_; }
@@ -104,7 +121,7 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   const RowDescriptor* row_desc() const { return row_desc_; }
   MemTracker* deferred_rpc_tracker() const { return 
deferred_rpc_tracker_.get(); }
   MemTracker* parent_tracker() const { return parent_tracker_; }
-  BufferPool::ClientHandle* client() const { return client_; }
+  BufferPool::ClientHandle* buffer_pool_client() const { return 
buffer_pool_client_; }
 
  private:
   friend class KrpcDataStreamMgr;
@@ -127,11 +144,13 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// Tries adding the first entry of 'deferred_rpcs_' queue for the sender 
queue
   /// identified by 'sender_id'. If is_merging_ is false, it always defaults to
   /// queue 0; If is_merging_ is true, the sender queue is identified by 
'sender_id_'.
+  /// Called from KrpcDataStreamMgr's deserialization threads only.
   void DequeueDeferredRpc(int sender_id);
 
   /// Takes over the RPC state 'ctx' of an early sender for deferred 
processing and
   /// kicks off a deserialization task to process it asynchronously. This 
makes sure
   /// new incoming RPCs won't pass the early senders, leading to starvation.
+  /// Called from fragment instance execution threads only.
   void TakeOverEarlySender(std::unique_ptr<TransmitDataCtx> ctx);
 
   /// Indicate that a particular sender is done. Delegated to the appropriate
@@ -148,7 +167,7 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
     return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
   }
 
-  /// KrpcDataStreamMgr instance used to create this recvr. (Not owned)
+  /// KrpcDataStreamMgr instance used to create this recvr. Not owned.
   KrpcDataStreamMgr* mgr_;
 
   /// Fragment and node id of the destination exchange node this receiver is 
used by.
@@ -160,18 +179,22 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// buffered data exceeds this value.
   const int64_t total_buffer_limit_;
 
-  /// Row schema.
+  /// Row schema. Not owned.
   const RowDescriptor* row_desc_;
 
   /// True if this reciver merges incoming rows from different senders. 
Per-sender
   /// row batch queues are maintained in this case.
-  bool is_merging_;
+  const bool is_merging_;
+
+  /// True if Close() has been called on this receiver already. Should only be 
accessed
+  /// from the fragment execution thread.
+  bool closed_;
 
   /// total number of bytes held across all sender queues.
   AtomicInt32 num_buffered_bytes_;
 
-  /// Memtracker for payloads of deferred Rpcs in the sender queue(s).
-  /// This must be accessed with 'lock_' held to avoid race with Close().
+  /// Memtracker for payloads of deferred Rpcs in the sender queue(s). This 
must be
+  /// accessed with a sender queue's lock held to avoid race with Close() of 
the queue.
   boost::scoped_ptr<MemTracker> deferred_rpc_tracker_;
 
   /// The MemTracker of the exchange node which owns this receiver. Not owned.
@@ -179,32 +202,38 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   MemTracker* parent_tracker_;
 
   /// The buffer pool client for allocating buffers of incoming row batches. 
Not owned.
-  BufferPool::ClientHandle* client_;
+  BufferPool::ClientHandle* buffer_pool_client_;
 
   /// One or more queues of row batches received from senders. If is_merging_ 
is true,
   /// there is one SenderQueue for each sender. Otherwise, row batches from 
all senders
   /// are placed in the same SenderQueue. The SenderQueue instances are owned 
by the
-  /// receiver and placed in sender_queue_pool_.
+  /// receiver and placed in 'pool_'.
   std::vector<SenderQueue*> sender_queues_;
 
   /// SortedRunMerger used to merge rows from different senders.
   boost::scoped_ptr<SortedRunMerger> merger_;
 
-  /// Pool of sender queues.
-  ObjectPool sender_queue_pool_;
+  /// Pool which owns sender queues and the runtime profiles.
+  ObjectPool pool_;
 
-  /// Runtime profile storing the counters below.
+  /// Runtime profile of the owning exchange node. It's the parent of
+  /// 'recvr_side_profile_' and 'sender_side_profile_'. Not owned.
   RuntimeProfile* profile_;
 
   /// Maintain two child profiles - receiver side measurements (from the 
GetBatch() path),
-  /// and sender side measurements (from AddBatch()).
+  /// and sender side measurements (from AddBatch()). These two profiles own 
all counters
+  /// below unless otherwise noted. These profiles are owned by the receiver 
and placed
+  /// in 'pool_'. 'recvr_side_profile_' and 'sender_side_profile_' must 
outlive 'profile_'
+  /// to prevent accessing freed memory during top-down traversal of 
'profile_'. The
+  /// receiver is co-owned by the exchange node and the data stream manager so 
these two
+  /// profiles should outlive the exchange node which owns 'profile_'.
   RuntimeProfile* recvr_side_profile_;
   RuntimeProfile* sender_side_profile_;
 
   /// Number of bytes received but not necessarily enqueued.
   RuntimeProfile::Counter* bytes_received_counter_;
 
-  /// Time series of number of bytes received, samples bytes_received_counter_
+  /// Time series of number of bytes received, samples bytes_received_counter_.
   RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
 
   /// Total wall-clock time spent deserializing row batches.
@@ -237,7 +266,7 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// Total wall-clock time spent waiting for data to arrive in the recv 
buffer.
   RuntimeProfile::Counter* data_arrival_timer_;
 
-  /// Pointer to profile's inactive timer.
+  /// Pointer to profile's inactive timer. Not owned.
   RuntimeProfile::Counter* inactive_timer_;
 
   /// Total time spent in SenderQueue::GetBatch().

http://git-wip-us.apache.org/repos/asf/impala/blob/6f430bd7/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index b290784..1cc646d 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -326,8 +326,9 @@ void QueryState::StartFInstances() {
     // start new thread to execute instance
     refcnt_.Add(1); // decremented in ExecFInstance()
     AcquireExecResourceRefcount(); // decremented in ExecFInstance()
-    string thread_name = Substitute(
-        "exec-finstance (finst:$0)", 
PrintId(instance_ctx.fragment_instance_id));
+    string thread_name = Substitute("$0 (finst:$1)",
+        FragmentInstanceState::FINST_THREAD_NAME_PREFIX,
+        PrintId(instance_ctx.fragment_instance_id));
     unique_ptr<Thread> t;
     thread_create_status = 
Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
         thread_name, [this, fis]() { this->ExecFInstance(fis); }, &t, true);

Reply via email to