IMPALA-2642: Fix a potential deadlock in statestore

The statestored can deadlock if the number of subscribers has
reached STATESTORE_MAX_SUBSCRIBERS, because the DoSubscriberUpdate()
method calls OfferUpdate(), while holding subscribers_lock_, which
also tries to take the same lock in this situation.

Fix the issue by moving out the call to acquire subscribers_lock_ from
OfferUpdate(), and depend on the callers to take it. We also make
the maximum number of statestore subscribers a start-up time tuneable,
to allow us to test the limit more easily.

Testing: The problem is easily reproduced by lowering the value of
STATESTORE_MAX_SUBSCRIBERS to 3, and then launching a mini cluster
with 3 impalads. Without the fix, the statestored becomes completely
deadlocked.

A new EE test has been added to exercise this scenario. The test
verifies that statestored correctly rejects new subscription
requests when the limit it reached.

Change-Id: I5d49dede221ce1f50ec299643b5532c61f93f0c6
Reviewed-on: http://gerrit.cloudera.org:8080/9038
Reviewed-by: Sailesh Mukil <sail...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ca01c9b7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ca01c9b7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ca01c9b7

Branch: refs/heads/2.x
Commit: ca01c9b70f5d55b1f4c990c5f2897d110268bfff
Parents: 4e5f039
Author: Zoram Thanga <zo...@cloudera.com>
Authored: Tue Jan 16 12:01:09 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Fri Feb 2 01:10:15 2018 +0000

----------------------------------------------------------------------
 be/src/statestore/statestore.cc                | 33 ++++----
 be/src/statestore/statestore.h                 |  2 +-
 tests/custom_cluster/test_custom_statestore.py | 88 +++++++++++++++++++++
 3 files changed, 108 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ca01c9b7/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 0f72e58..8f4ddbf 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -62,6 +62,9 @@ DEFINE_int32(statestore_heartbeat_tcp_timeout_seconds, 3, 
"(Advanced) The time a
     "badly hung machines that are not able to respond to the heartbeat RPC in 
short "
     "order");
 
+DEFINE_int32(statestore_max_subscribers, 10000, "Used to control the maximum 
size "
+    "of the pending topic-update queue. There is at most one entry per 
subscriber.");
+
 // If this value is set too low, it's possible that UpdateState() might 
timeout during a
 // working invocation, and only a restart of the statestore with a change in 
value would
 // allow progress to be made. If set too high, a hung subscriber will waste an 
update
@@ -93,10 +96,6 @@ const string STATESTORE_HEARTBEAT_DURATION = 
"statestore.heartbeat-durations";
 // an item with the initial version.
 const Statestore::TopicEntry::Version 
Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0;
 
-// Used to control the maximum size of the pending topic-update queue, in 
which there is
-// at most one entry per subscriber.
-const int32_t STATESTORE_MAX_SUBSCRIBERS = 10000;
-
 // Updates or heartbeats that miss their deadline by this much are logged.
 const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
 
@@ -216,12 +215,12 @@ Statestore::Statestore(MetricGroup* metrics)
     subscriber_topic_update_threadpool_("statestore-update",
         "subscriber-update-worker",
         FLAGS_statestore_num_update_threads,
-        STATESTORE_MAX_SUBSCRIBERS,
+        FLAGS_statestore_max_subscribers,
         bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, false, _1, 
_2)),
     subscriber_heartbeat_threadpool_("statestore-heartbeat",
         "subscriber-heartbeat-worker",
         FLAGS_statestore_num_heartbeat_threads,
-        STATESTORE_MAX_SUBSCRIBERS,
+        FLAGS_statestore_max_subscribers,
         bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, true, _1, 
_2)),
     update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000,
@@ -349,11 +348,17 @@ void Statestore::SubscribersHandler(const 
Webserver::ArgumentMap& args,
 
 Status Statestore::OfferUpdate(const ScheduledSubscriberUpdate& update,
     ThreadPool<ScheduledSubscriberUpdate>* threadpool) {
-  if (threadpool->GetQueueSize() >= STATESTORE_MAX_SUBSCRIBERS
+  // Somewhat confusingly, we're checking the number of entries in a particular
+  // threadpool's work queue to decide whether or not we have too many
+  // subscribers. The number of subscribers registered can be actually more
+  // than statestore_max_subscribers. This is because RegisterSubscriber() adds
+  // the new subscriber to subscribers_ first before scheduling its updates.
+  // Should we be stricter in enforcing this limit on subscribers_.size() 
itself?
+  if (threadpool->GetQueueSize() >= FLAGS_statestore_max_subscribers
       || !threadpool->Offer(update)) {
     stringstream ss;
-    ss << "Maximum subscriber limit reached: " << STATESTORE_MAX_SUBSCRIBERS;
-    lock_guard<mutex> l(subscribers_lock_);
+    ss << "Maximum subscriber limit reached: " << 
FLAGS_statestore_max_subscribers;
+    ss << ", subscribers_ size: " << subscribers_.size();
     SubscriberMap::iterator subscriber_it = 
subscribers_.find(update.subscriber_id);
     DCHECK(subscriber_it != subscribers_.end());
     subscribers_.erase(subscriber_it);
@@ -400,12 +405,12 @@ Status Statestore::RegisterSubscriber(const SubscriberId& 
subscriber_id,
         PrintId(current_registration->registration_id()), true);
     num_subscribers_metric_->SetValue(subscribers_.size());
     subscriber_set_metric_->Add(subscriber_id);
-  }
 
-  // Add the subscriber to the update queue, with an immediate schedule.
-  ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id);
-  RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
-  RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
+    // Add the subscriber to the update queue, with an immediate schedule.
+    ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id);
+    RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
+    RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
+  }
 
   LOG(INFO) << "Subscriber '" << subscriber_id << "' registered (registration 
id: "
             << PrintId(*registration_id) << ")";

http://git-wip-us.apache.org/repos/asf/impala/blob/ca01c9b7/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 38b8361..deeb5aa 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -461,7 +461,7 @@ class Statestore : public CacheLineAligned {
   StatsMetric<double>* heartbeat_duration_metric_;
 
   /// Utility method to add an update to the given thread pool, and to fail if 
the thread
-  /// pool is already at capacity.
+  /// pool is already at capacity. Assumes that subscribers_lock_ is held by 
the caller.
   Status OfferUpdate(const ScheduledSubscriberUpdate& update,
       ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/ca01c9b7/tests/custom_cluster/test_custom_statestore.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_custom_statestore.py 
b/tests/custom_cluster/test_custom_statestore.py
new file mode 100644
index 0000000..ee810ed
--- /dev/null
+++ b/tests/custom_cluster/test_custom_statestore.py
@@ -0,0 +1,88 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tests statestore with non-default startup options
+
+import logging
+import os
+import pytest
+import re
+import sys
+import uuid
+import socket
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+from Types.ttypes import TNetworkAddress
+from thrift.protocol import TBinaryProtocol
+from thrift.transport import TSocket, TTransport
+
+import StatestoreService.StatestoreSubscriber as Subscriber
+import StatestoreService.StatestoreService as Statestore
+from ErrorCodes.ttypes import TErrorCode
+
+LOG = logging.getLogger('custom_statestore_test')
+STATESTORE_SERVICE_PORT = 24000
+
+# A simple wrapper class to launch a cluster where we can tune various
+# startup parameters of the statestored to test correct boundary-value
+# behavior.
+class TestCustomStatestore(CustomClusterTestSuite):
+  # Grab a port the statestore subscribers will use to connect.
+  # Note that all subscribers we create below use this port to connect,
+  # with different subscriber IDs.
+  handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+  handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+  handle.bind(('localhost', 0))
+  _, port = handle.getsockname()
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  def __register_subscriber(self):
+    subscriber_id = "python-test-client-%s" % uuid.uuid4()
+    topics = []
+    request = Subscriber.TRegisterSubscriberRequest(topic_registrations=topics,
+      subscriber_location=TNetworkAddress("localhost", self.port),
+      subscriber_id=subscriber_id)
+    client_transport = \
+      TTransport.TBufferedTransport(TSocket.TSocket('localhost', 
STATESTORE_SERVICE_PORT))
+    protocol = TBinaryProtocol.TBinaryProtocol(client_transport)
+    client = Statestore.Client(protocol)
+    client_transport.open()
+    return client.RegisterSubscriber(request)
+
+  
@CustomClusterTestSuite.with_args(statestored_args="-statestore_max_subscribers=3")
+  def test_statestore_max_subscribers(self):
+    """Test that the statestored correctly handles the condition where the 
number
+    of subscribers exceeds FLAGS_statestore_max_subscribers
+    (see be/src/statestore/statestore.cc). The expected behavior is for the
+    statestored to reject the subscription request once the threshold is
+    exceeded."""
+    # With a statestore_max_subscribers of 3, we should hit the registration 
error
+    # pretty quick.
+    for x in xrange(20):
+      response = self.__register_subscriber()
+      if response.status.status_code == TErrorCode.OK:
+        self.registration_id = response.registration_id
+        LOG.log(logging.INFO, "Registration id %s, x=%d" % 
(response.registration_id, x))
+      else:
+        assert 'Maximum subscriber limit reached:' in 
''.join(response.status.error_msgs)
+        return

Reply via email to