IMPALA-6346: Potential deadlock in KrpcDataStreamMgr

In KrpcDataStreamMgr::CreateRecvr() we take the lock_ and
then call recvr->TakeOverEarlySender() for all contexts.
recvr->TakeOverEarlySender() then calls
recvr_->mgr_->EnqueueDeserializeTask((), which can block if the
deserialize pool queue is full. The next thread to become available
in that queue will also have to acquire lock_, thus leading to a
deadlock.

We fix this by moving the EarlySendersList out of the
EarlySendersMap and dropping the lock before taking any actions on
the RPC contexts in the EarlySendersList. All functions called after
dropping 'lock_' do not require the lock to protect them as they are
thread safe.

Additionally modified the BE test data-stream-test to work with KRPC
as well.

Testing: Added a new test to data-stream-test to verify that the
deadlock does not happen. Also, I verified that this test hangs
without the fix.

Change-Id: Ib7d1a8f12a4821092ca61ccc8a6f20c0404d56c7
Reviewed-on: http://gerrit.cloudera.org:8080/8950
Reviewed-by: Sailesh Mukil <sail...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ff86feaa
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ff86feaa
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ff86feaa

Branch: refs/heads/master
Commit: ff86feaa67ff8bf703896e33d9a358e42739ae30
Parents: f0b3d9d
Author: Sailesh Mukil <sail...@cloudera.com>
Authored: Fri Jan 5 10:48:13 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Fri Feb 2 02:14:46 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-test.cc     | 282 +++++++++++++++++++++++-----
 be/src/runtime/krpc-data-stream-mgr.cc |  44 +++--
 2 files changed, 267 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ff86feaa/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index 5e70497..07eefd4 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -23,13 +23,18 @@
 #include "common/status.h"
 #include "codegen/llvm-codegen.h"
 #include "exprs/slot-ref.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/service_if.h"
 #include "rpc/auth-provider.h"
 #include "rpc/thrift-server.h"
+#include "rpc/rpc-mgr.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/data-stream-mgr-base.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
+#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-sender.h"
 #include "runtime/data-stream-sender.h"
 #include "runtime/data-stream-recvr-base.h"
 #include "runtime/data-stream-recvr.h"
@@ -38,6 +43,7 @@
 #include "runtime/backend-client.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
+#include "service/data-stream-service.h"
 #include "service/fe-support.h"
 #include "util/cpu-info.h"
 #include "util/disk-info.h"
@@ -47,6 +53,7 @@
 #include "util/mem-info.h"
 #include "util/test-info.h"
 #include "util/tuple-row-compare.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
@@ -54,6 +61,7 @@
 #include "service/fe-support.h"
 
 #include <iostream>
+#include <unistd.h>
 
 #include "common/names.h"
 
@@ -61,9 +69,17 @@ using namespace impala;
 using namespace apache::thrift;
 using namespace apache::thrift::protocol;
 
-DEFINE_int32(port, 20001, "port on which to run Impala test backend");
-DECLARE_string(principal);
+using kudu::MetricEntity;
+using kudu::rpc::ResultTracker;
+using kudu::rpc::RpcContext;
+using kudu::rpc::ServiceIf;
+
+DEFINE_int32(port, 20001, "port on which to run Impala Thrift based test 
backend.");
 DECLARE_int32(datastream_sender_timeout_ms);
+DECLARE_int32(datastream_service_num_deserialization_threads);
+DECLARE_int32(datastream_service_deserialization_queue_size);
+
+DECLARE_bool(use_krpc);
 
 // We reserve contiguous memory for senders in SetUp. If a test uses more
 // senders, a DCHECK will fail and you should increase this value.
@@ -78,10 +94,12 @@ static const int NUM_BATCHES = TOTAL_DATA_SIZE / 
BATCH_CAPACITY / PER_ROW_DATA;
 
 namespace impala {
 
-class ImpalaTestBackend : public ImpalaInternalServiceIf {
+// This class acts as a service interface for all Thrift related communication 
within
+// this test file.
+class ImpalaThriftTestBackend : public ImpalaInternalServiceIf {
  public:
-  ImpalaTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {}
-  virtual ~ImpalaTestBackend() {}
+  ImpalaThriftTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {}
+  virtual ~ImpalaThriftTestBackend() {}
 
   virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
       const TExecQueryFInstancesParams& params) {}
@@ -109,13 +127,43 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf {
   DataStreamMgr* mgr_;
 };
 
-class DataStreamTest : public testing::Test {
+// This class acts as a service interface for all KRPC related communication 
within
+// this test file.
+class ImpalaKRPCTestBackend : public DataStreamServiceIf {
+ public:
+  ImpalaKRPCTestBackend(RpcMgr* rpc_mgr, KrpcDataStreamMgr* stream_mgr)
+    : DataStreamServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
+      stream_mgr_(stream_mgr) {}
+  virtual ~ImpalaKRPCTestBackend() {}
+
+  virtual void TransmitData(const TransmitDataRequestPB* request,
+      TransmitDataResponsePB* response, RpcContext* rpc_context) {
+    stream_mgr_->AddData(request, response, rpc_context);
+  }
+
+  virtual void EndDataStream(const EndDataStreamRequestPB* request,
+      EndDataStreamResponsePB* response, RpcContext* rpc_context) {
+    stream_mgr_->CloseSender(request, response, rpc_context);
+  }
+
+ private:
+  KrpcDataStreamMgr* stream_mgr_;
+};
+
+template <class T> class DataStreamTestBase : public T {
+ protected:
+  virtual void SetUp() {}
+  virtual void TearDown() {}
+};
+
+enum KrpcSwitch {
+  USE_THRIFT,
+  USE_KRPC
+};
+
+class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwitch> > {
  protected:
   DataStreamTest() : next_val_(0) {
-    // Initialize MemTrackers and RuntimeState for use by the data stream 
receiver.
-    ABORT_IF_ERROR(exec_env_.InitForFeTests());
-    runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_));
-    mem_pool_.reset(new MemPool(&tracker_));
 
     // Stop tests that rely on mismatched sender / receiver pairs timing out 
from failing.
     FLAGS_datastream_sender_timeout_ms = 250;
@@ -123,6 +171,14 @@ class DataStreamTest : public testing::Test {
   ~DataStreamTest() { runtime_state_->ReleaseResources(); }
 
   virtual void SetUp() {
+    // Initialize MemTrackers and RuntimeState for use by the data stream 
receiver.
+    FLAGS_use_krpc = GetParam() == USE_KRPC;
+
+    exec_env_.reset(new ExecEnv());
+    ABORT_IF_ERROR(exec_env_->InitForFeTests());
+    runtime_state_.reset(new RuntimeState(TQueryCtx(), exec_env_.get()));
+    mem_pool_.reset(new MemPool(&tracker_));
+
     CreateRowDesc();
 
     is_asc_.push_back(true);
@@ -131,7 +187,8 @@ class DataStreamTest : public testing::Test {
 
     next_instance_id_.lo = 0;
     next_instance_id_.hi = 0;
-    stream_mgr_ = new DataStreamMgr(new MetricGroup(""));
+    stream_mgr_ = ExecEnv::GetInstance()->stream_mgr();
+    if (GetParam() == USE_KRPC) krpc_mgr_ = ExecEnv::GetInstance()->rpc_mgr();
 
     broadcast_sink_.dest_node_id = DEST_NODE_ID;
     broadcast_sink_.output_partition.type = TPartitionType::UNPARTITIONED;
@@ -159,7 +216,11 @@ class DataStreamTest : public testing::Test {
     // Ensure that individual sender info addresses don't change
     sender_info_.reserve(MAX_SENDERS);
     receiver_info_.reserve(MAX_RECEIVERS);
-    StartBackend();
+    if (GetParam() == USE_THRIFT) {
+      StartThriftBackend();
+    } else {
+      StartKrpcBackend();
+    }
   }
 
   const TDataSink GetSink(TPartitionType::type partition_type) {
@@ -185,8 +246,12 @@ class DataStreamTest : public testing::Test {
     less_than_->Close(runtime_state_.get());
     ScalarExpr::Close(ordering_exprs_);
     mem_pool_->FreeAll();
-    exec_env_.impalad_client_cache()->TestShutdown();
-    StopBackend();
+    if (GetParam() == USE_THRIFT) {
+      exec_env_->impalad_client_cache()->TestShutdown();
+      StopThriftBackend();
+    } else {
+      StopKrpcBackend();
+    }
   }
 
   void Reset() {
@@ -203,7 +268,7 @@ class DataStreamTest : public testing::Test {
   vector<bool> is_asc_;
   vector<bool> nulls_first_;
   TupleRowComparator* less_than_;
-  ExecEnv exec_env_;
+  boost::scoped_ptr<ExecEnv> exec_env_;
   scoped_ptr<RuntimeState> runtime_state_;
   TUniqueId next_instance_id_;
   string stmt_;
@@ -215,8 +280,12 @@ class DataStreamTest : public testing::Test {
   int next_val_;
   int64_t* tuple_mem_;
 
+  // Only used for KRPC. Not owned.
+  RpcMgr* krpc_mgr_ = nullptr;
+  TNetworkAddress krpc_address_;
+
   // receiving node
-  DataStreamMgrBase* stream_mgr_;
+  DataStreamMgrBase* stream_mgr_ = nullptr;
   ThriftServer* server_;
 
   // sending node(s)
@@ -266,6 +335,9 @@ class DataStreamTest : public testing::Test {
     dest.fragment_instance_id = next_instance_id_;
     dest.server.hostname = "127.0.0.1";
     dest.server.port = FLAGS_port;
+    if (GetParam() == USE_KRPC) {
+      dest.__set_krpc_server(krpc_address_);
+    }
     *instance_id = next_instance_id_;
     ++next_instance_id_.lo;
   }
@@ -452,24 +524,51 @@ class DataStreamTest : public testing::Test {
     }
   }
 
-  // Start backend in separate thread.
-  void StartBackend() {
+  // Start Thrift based backend in separate thread.
+  void StartThriftBackend() {
     // Dynamic cast stream_mgr_ which is of type DataStreamMgrBase to derived 
type
-    // DataStreamMgr, since ImpalaTestBackend() accepts only DataStreamMgr*.
-    boost::shared_ptr<ImpalaTestBackend> handler(
-        new ImpalaTestBackend(dynamic_cast<DataStreamMgr*>(stream_mgr_)));
+    // DataStreamMgr, since ImpalaThriftTestBackend() accepts only 
DataStreamMgr*.
+    boost::shared_ptr<ImpalaThriftTestBackend> handler(
+        new 
ImpalaThriftTestBackend(dynamic_cast<DataStreamMgr*>(stream_mgr_)));
     boost::shared_ptr<TProcessor> processor(new 
ImpalaInternalServiceProcessor(handler));
     ThriftServerBuilder builder("DataStreamTest backend", processor, 
FLAGS_port);
     ASSERT_OK(builder.Build(&server_));
     ASSERT_OK(server_->Start());
   }
 
-  void StopBackend() {
+  void StartKrpcBackend() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
+
+    MemTracker* data_svc_tracker = obj_pool_.Add(
+        new MemTracker(-1, "Data Stream Service",
+            ExecEnv::GetInstance()->process_mem_tracker()));
+    MemTracker* stream_mgr_tracker = obj_pool_.Add(
+        new MemTracker(-1, "Data Stream Queued RPC Calls",
+            ExecEnv::GetInstance()->process_mem_tracker()));
+
+    KrpcDataStreamMgr* stream_mgr_ref = 
dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_);
+    ASSERT_OK(stream_mgr_ref->Init(stream_mgr_tracker, data_svc_tracker));
+    ASSERT_OK(krpc_mgr_->Init());
+
+    unique_ptr<ServiceIf> handler(
+        new ImpalaKRPCTestBackend(krpc_mgr_, stream_mgr_ref));
+    ASSERT_OK(krpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, 
move(handler),
+        data_svc_tracker));
+    ASSERT_OK(krpc_mgr_->StartServices(krpc_address_));
+  }
+
+  void StopThriftBackend() {
     VLOG_QUERY << "stop backend\n";
     server_->StopForTesting();
     delete server_;
   }
 
+  void StopKrpcBackend() {
+    krpc_mgr_->Shutdown();
+  }
+
   void StartSender(TPartitionType::type partition_type = 
TPartitionType::UNPARTITIONED,
                    int channel_buffer_size = 1024) {
     VLOG_QUERY << "start sender";
@@ -479,7 +578,7 @@ class DataStreamTest : public testing::Test {
     SenderInfo& info = sender_info_.back();
     info.thread_handle =
         new thread(&DataStreamTest::Sender, this, num_senders, 
channel_buffer_size,
-                   partition_type);
+                   partition_type, GetParam() == USE_THRIFT);
   }
 
   void JoinSenders() {
@@ -489,35 +588,54 @@ class DataStreamTest : public testing::Test {
     }
   }
 
-  void Sender(
-      int sender_num, int channel_buffer_size, TPartitionType::type 
partition_type) {
-    RuntimeState state(TQueryCtx(), &exec_env_, desc_tbl_);
+  void Sender(int sender_num,
+      int channel_buffer_size, TPartitionType::type partition_type, bool 
is_thrift) {
+    RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink& sink = GetSink(partition_type);
-    DataStreamSender sender(
-        sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, 
&state);
+
+    // We create an object of the base class DataSink and cast to the 
appropriate sender
+    // according to the 'is_thrift' option.
+    scoped_ptr<DataSink> sender;
 
     TExprNode expr_node;
     expr_node.node_type = TExprNodeType::SLOT_REF;
     TExpr output_exprs;
     output_exprs.nodes.push_back(expr_node);
-    EXPECT_OK(sender.Init(vector<TExpr>({output_exprs}), sink, &state));
 
-    EXPECT_OK(sender.Prepare(&state, &tracker_));
-    EXPECT_OK(sender.Open(&state));
+    if (is_thrift) {
+      sender.reset(new DataStreamSender(
+          sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, 
&state));
+      EXPECT_OK(static_cast<DataStreamSender*>(
+          sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
+    } else {
+      sender.reset(new KrpcDataStreamSender(
+          sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, 
&state));
+      EXPECT_OK(static_cast<KrpcDataStreamSender*>(
+          sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
+    }
+
+    EXPECT_OK(sender->Prepare(&state, &tracker_));
+    EXPECT_OK(sender->Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
     SenderInfo& info = sender_info_[sender_num];
     int next_val = 0;
     for (int i = 0; i < NUM_BATCHES; ++i) {
       GetNextBatch(batch.get(), &next_val);
       VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows();
-      info.status = sender.Send(&state, batch.get());
+      info.status = sender->Send(&state, batch.get());
       if (!info.status.ok()) break;
     }
     VLOG_QUERY << "closing sender" << sender_num;
-    info.status.MergeStatus(sender.FlushFinal(&state));
-    sender.Close(&state);
-    info.num_bytes_sent = sender.GetNumDataBytesSent();
+    info.status.MergeStatus(sender->FlushFinal(&state));
+    sender->Close(&state);
+    if (is_thrift) {
+      info.num_bytes_sent = static_cast<DataStreamSender*>(
+          sender.get())->GetNumDataBytesSent();
+    } else {
+      info.num_bytes_sent = static_cast<KrpcDataStreamSender*>(
+          sender.get())->GetNumDataBytesSent();
+    }
 
     batch->Reset();
     state.ReleaseResources();
@@ -542,7 +660,44 @@ class DataStreamTest : public testing::Test {
   }
 };
 
-TEST_F(DataStreamTest, UnknownSenderSmallResult) {
+// We use a seperate class for tests that are required to be run against 
Thrift only.
+class DataStreamTestThriftOnly : public DataStreamTest {
+ protected:
+  virtual void SetUp() {
+    DataStreamTest::SetUp();
+  }
+
+  virtual void TearDown() {
+    DataStreamTest::TearDown();
+  }
+};
+
+// We need a seperate test class for IMPALA-6346, since we need to do some 
pre-SetUp()
+// work. Specifically we need to set 2 flags that will be picked up during the 
SetUp()
+// phase of the DataStreamTest class.
+class DataStreamTestForImpala6346 : public DataStreamTest {
+ protected:
+  virtual void SetUp() {
+    FLAGS_datastream_service_num_deserialization_threads = 1;
+    FLAGS_datastream_service_deserialization_queue_size = 1;
+    DataStreamTest::SetUp();
+  }
+
+  virtual void TearDown() {
+    DataStreamTest::TearDown();
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(ThriftOrKrpc, DataStreamTest,
+    ::testing::Values(USE_THRIFT, USE_KRPC));
+
+INSTANTIATE_TEST_CASE_P(ThriftOnly, DataStreamTestThriftOnly,
+    ::testing::Values(USE_THRIFT));
+
+INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestForImpala6346,
+    ::testing::Values(USE_KRPC));
+
+TEST_P(DataStreamTest, UnknownSenderSmallResult) {
   // starting a sender w/o a corresponding receiver results in an error. No 
bytes should
   // be sent.
   // case 1: entire query result fits in single buffer
@@ -554,7 +709,7 @@ TEST_F(DataStreamTest, UnknownSenderSmallResult) {
   EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
 }
 
-TEST_F(DataStreamTest, UnknownSenderLargeResult) {
+TEST_P(DataStreamTest, UnknownSenderLargeResult) {
   // case 2: query result requires multiple buffers
   TUniqueId dummy_id;
   GetNextInstanceId(&dummy_id);
@@ -564,7 +719,7 @@ TEST_F(DataStreamTest, UnknownSenderLargeResult) {
   EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
 }
 
-TEST_F(DataStreamTest, Cancel) {
+TEST_P(DataStreamTest, Cancel) {
   TUniqueId instance_id;
   StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, 
&instance_id);
   stream_mgr_->Cancel(instance_id);
@@ -575,7 +730,7 @@ TEST_F(DataStreamTest, Cancel) {
   EXPECT_TRUE(receiver_info_[1].status.IsCancelled());
 }
 
-TEST_F(DataStreamTest, BasicTest) {
+TEST_P(DataStreamTest, BasicTest) {
   // TODO: also test that all client connections have been returned
   TPartitionType::type stream_types[] =
       {TPartitionType::UNPARTITIONED, TPartitionType::RANDOM,
@@ -605,8 +760,8 @@ TEST_F(DataStreamTest, BasicTest) {
 // parent is destroyed. In practice the parent is a member of the query's 
runtime state.
 //
 // TODO: Make lifecycle requirements more explicit.
-TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
-  scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), 
&exec_env_));
+TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
+  scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), 
exec_env_.get()));
   RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
 
   // Start just one receiver.
@@ -628,7 +783,7 @@ TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
   // RPC does not cause an error (the receiver will still be called, since it 
is only
   // Close()'d, not deleted from the data stream manager).
   Status rpc_status;
-  ImpalaBackendConnection client(exec_env_.impalad_client_cache(),
+  ImpalaBackendConnection client(exec_env_->impalad_client_cache(),
       MakeNetworkAddress("localhost", FLAGS_port), &rpc_status);
   EXPECT_OK(rpc_status);
   TTransmitDataParams params;
@@ -647,6 +802,49 @@ TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
   stream_recvr.reset();
 }
 
+// This test is to exercise a previously present deadlock path which is now 
fixed, to
+// ensure that the deadlock does not happen anymore. It does this by doing the 
following:
+// This test starts multiple senders to send to the same receiver. It makes 
sure that
+// the senders' payloads reach the receiver before the receiver is setup. Once 
the
+// receiver is being created, it will notice that there are multiple payloads 
waiting
+// to be processed already and it would hold the KrpcDataStreamMgr::lock_ and 
call
+// TakeOverEarlySender() which calls EnqueueDeserializeTask() which tries to 
Offer()
+// the payload to the deserialization_pool_. However, we've set the queue size 
to 1,
+// which will cause the payload to be stuck on the Offer(). Now any payload 
that is
+// already being deserialized will be waiting on the KrpcDataStreamMgr::lock_ 
as well.
+// But the first thread will never release the lock since it's stuck on 
Offer(), causing
+// a deadlock. This is fixed with IMPALA-6346.
+TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
+  TUniqueId instance_id;
+  GetNextInstanceId(&instance_id);
+
+  // Start 4 senders.
+  StartSender(TPartitionType::UNPARTITIONED, 1024 * 1024);
+  StartSender(TPartitionType::UNPARTITIONED, 1024 * 1024);
+  StartSender(TPartitionType::UNPARTITIONED, 1024 * 1024);
+  StartSender(TPartitionType::UNPARTITIONED, 1024 * 1024);
+
+  // Do a small sleep to ensure that the sent payloads reach before the 
receivers
+  // are created.
+  sleep(2);
+
+  // Setup the receiver.
+  RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
+  receiver_info_.push_back(ReceiverInfo(TPartitionType::UNPARTITIONED, 4, 1));
+  ReceiverInfo& info = receiver_info_.back();
+  info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), row_desc_,
+      instance_id, DEST_NODE_ID, 4, 1024 * 1024, profile, false);
+  info.thread_handle = new thread(
+      &DataStreamTestForImpala6346_TestNoDeadlock_Test::ReadStream, this, 
&info);
+
+  JoinSenders();
+  CheckSenders();
+  JoinReceivers();
+
+  // Check that 4 payloads have been received.
+  CheckReceivers(TPartitionType::UNPARTITIONED, 4);
+}
+
 // TODO: more tests:
 // - test case for transmission error in last batch
 // - receivers getting created concurrently

http://git-wip-us.apache.org/repos/asf/impala/blob/ff86feaa/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc 
b/be/src/runtime/krpc-data-stream-mgr.cc
index 3f777ea..fabea13 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -54,14 +54,17 @@ DECLARE_int32(datastream_sender_timeout_ms);
 DEFINE_int32(datastream_service_num_deserialization_threads, 16,
     "Number of threads for deserializing RPC requests deferred due to the 
receiver "
     "not ready or the soft limit of the receiver is reached.");
-
+DEFINE_int32(datastream_service_deserialization_queue_size, 10000,
+    "Number of deferred RPC requests that can be enqueued before being 
processed by a "
+    "deserialization thread.");
 using boost::mutex;
 
 namespace impala {
 
 KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
   : deserialize_pool_("data-stream-mgr", "deserialize",
-      FLAGS_datastream_service_num_deserialization_threads, 10000,
+      FLAGS_datastream_service_num_deserialization_threads,
+      FLAGS_datastream_service_deserialization_queue_size,
       boost::bind(&KrpcDataStreamMgr::DeserializeThreadFn, this, _1, _2)) {
   MetricGroup* dsm_metrics = 
metrics->GetOrCreateChildGroup("datastream-manager");
   num_senders_waiting_ =
@@ -102,6 +105,7 @@ shared_ptr<DataStreamRecvrBase> 
KrpcDataStreamMgr::CreateRecvr(
       new KrpcDataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
           finst_id, dest_node_id, num_senders, is_merging, buffer_size, 
profile));
   uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
+  EarlySendersList early_senders_for_recvr;
   {
     RecvrId recvr_id = make_pair(finst_id, dest_node_id);
     lock_guard<mutex> l(lock_);
@@ -109,25 +113,31 @@ shared_ptr<DataStreamRecvrBase> 
KrpcDataStreamMgr::CreateRecvr(
     receiver_map_.insert(make_pair(hash_value, recvr));
 
     EarlySendersMap::iterator it = early_senders_map_.find(recvr_id);
+
     if (it != early_senders_map_.end()) {
-      EarlySendersList& early_senders = it->second;
-      // Let the receiver take over the RPC payloads of early senders and 
process them
-      // asynchronously.
-      for (unique_ptr<TransmitDataCtx>& ctx : 
early_senders.waiting_sender_ctxs) {
-        // Release memory. The receiver will track it in its instance tracker.
-        int64_t transfer_size = ctx->rpc_context->GetTransferSize();
-        recvr->TakeOverEarlySender(move(ctx));
-        mem_tracker_->Release(transfer_size);
-        num_senders_waiting_->Increment(-1);
-      }
-      for (const unique_ptr<EndDataStreamCtx>& ctx : 
early_senders.closed_sender_ctxs) {
-        recvr->RemoveSender(ctx->request->sender_id());
-        RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context, 
mem_tracker_);
-        num_senders_waiting_->Increment(-1);
-      }
+      // Move the early senders list here so that we can drop 'lock_'. We need 
to drop
+      // the lock before processing the early senders to avoid a deadlock.
+      // More details in IMPALA-6346.
+      early_senders_for_recvr = std::move(it->second);
       early_senders_map_.erase(it);
     }
   }
+
+  // Let the receiver take over the RPC payloads of early senders and process 
them
+  // asynchronously.
+  for (unique_ptr<TransmitDataCtx>& ctx : 
early_senders_for_recvr.waiting_sender_ctxs) {
+    // Release memory. The receiver will track it in its instance tracker.
+    int64_t transfer_size = ctx->rpc_context->GetTransferSize();
+    recvr->TakeOverEarlySender(move(ctx));
+    mem_tracker_->Release(transfer_size);
+    num_senders_waiting_->Increment(-1);
+  }
+  for (const unique_ptr<EndDataStreamCtx>& ctx :
+      early_senders_for_recvr.closed_sender_ctxs) {
+    recvr->RemoveSender(ctx->request->sender_id());
+    RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context, 
mem_tracker_);
+    num_senders_waiting_->Increment(-1);
+  }
   return recvr;
 }
 

Reply via email to