Initiated query execution data structure in all Shiftbosses.

Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a9f2ee16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a9f2ee16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a9f2ee16

Branch: refs/heads/multiple_shiftboss
Commit: a9f2ee16de7d33e34afc93e2bd82e919b827f649
Parents: 17ffbb0
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Oct 9 18:44:00 2016 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Oct 17 20:37:37 2016 -0700

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp          | 15 ++++-
 query_execution/ForemanDistributed.hpp          |  5 ++
 query_execution/PolicyEnforcerBase.cpp          |  2 +
 query_execution/PolicyEnforcerBase.hpp          | 13 ++++
 query_execution/PolicyEnforcerDistributed.cpp   | 67 +++++++++-----------
 query_execution/QueryExecutionMessages.proto    |  6 +-
 query_execution/QueryExecutionUtil.hpp          | 13 ++++
 query_execution/Shiftboss.cpp                   |  2 +
 .../DistributedExecutionGeneratorTestRunner.hpp |  2 +-
 9 files changed, 84 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp 
b/query_execution/ForemanDistributed.cpp
index 9c20465..56b319b 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -18,6 +18,8 @@
 #include <cstdio>
 #include <cstdlib>
 #include <memory>
+#include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -163,7 +165,9 @@ void ForemanDistributed::run() {
         break;
       }
       case kQueryInitiateResponseMessage: {
-        // TODO(zuyu): check the query id.
+        S::QueryInitiateResponseMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), 
tagged_message.message_bytes()));
+        CHECK(policy_enforcer_->existQuery(proto.query_id()));
         break;
       }
       case kCatalogRelationNewBlockMessage:  // Fall through
@@ -183,7 +187,14 @@ void ForemanDistributed::run() {
         S::SaveQueryResultResponseMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), 
tagged_message.message_bytes()));
 
-        processSaveQueryResultResponseMessage(proto.cli_id(), 
proto.relation_id());
+        const std::size_t query_id = proto.query_id();
+        
query_result_saved_shiftbosses_[query_id].insert(proto.shiftboss_index());
+
+        // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+        if (query_result_saved_shiftbosses_[query_id].size() == 
shiftboss_directory_.size()) {
+          processSaveQueryResultResponseMessage(proto.cli_id(), 
proto.relation_id());
+          query_result_saved_shiftbosses_.erase(query_id);
+        }
         break;
       }
       case kPoisonMessage: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp 
b/query_execution/ForemanDistributed.hpp
index fc1ede5..b42795c 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -18,6 +18,8 @@
 #include <cstddef>
 #include <cstdio>
 #include <memory>
+#include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -120,6 +122,9 @@ class ForemanDistributed final : public ForemanBase {
 
   std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
 
+  // From a query id to a set of Shiftbosses that save query result.
+  std::unordered_map<std::size_t, std::unordered_set<std::size_t>> 
query_result_saved_shiftbosses_;
+
   DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp 
b/query_execution/PolicyEnforcerBase.cpp
index 4174bd6..745ded6 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -142,6 +142,8 @@ void PolicyEnforcerBase::removeQuery(const std::size_t 
query_id) {
                  << " that hasn't finished its execution";
   }
   admitted_queries_.erase(query_id);
+
+  removed_query_ids_.insert(query_id);
 }
 
 bool PolicyEnforcerBase::admitQueries(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp 
b/query_execution/PolicyEnforcerBase.hpp
index 62906e9..25da598 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -24,6 +24,7 @@
 #include <memory>
 #include <queue>
 #include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "query_execution/QueryExecutionTypedefs.hpp"
@@ -106,6 +107,16 @@ class PolicyEnforcerBase {
   void processMessage(const TaggedMessage &tagged_message);
 
   /**
+   * @brief Check if the given query id ever exists.
+   *
+   * @return True if the query ever exists, otherwise false.
+   **/
+  inline bool existQuery(const std::size_t query_id) const {
+    return admitted_queries_.find(query_id) != admitted_queries_.end() ||
+           removed_query_ids_.find(query_id) != removed_query_ids_.end();
+  }
+
+  /**
    * @brief Check if there are any queries to be executed.
    *
    * @return True if there is at least one active or waiting query, false if
@@ -163,6 +174,8 @@ class PolicyEnforcerBase {
   // Key = query ID, value = QueryManagerBase* for the key query.
   std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> 
admitted_queries_;
 
+  std::unordered_set<std::size_t> removed_query_ids_;
+
   // The queries which haven't been admitted yet.
   std::queue<QueryHandle*> waiting_queries_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp 
b/query_execution/PolicyEnforcerDistributed.cpp
index 47491ed..c06fd86 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -37,6 +37,7 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+#include "tmb/address.h"
 #include "tmb/id_typedefs.h"
 #include "tmb/message_bus.h"
 #include "tmb/tagged_message.h"
@@ -170,25 +171,18 @@ void 
PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
                         kQueryInitiateMessage);
   free(proto_bytes);
 
-  // TODO(zuyu): Multiple Shiftbosses support.
+  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+  tmb::Address shiftboss_addresses;
+  for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
+    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+  }
+
   DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" 
<< kQueryInitiateMessage
-             << "') to Shiftboss with TMB client ID " << 
shiftboss_directory_->getClientId(0);
-  const tmb::MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         foreman_client_id_,
-                                         shiftboss_directory_->getClientId(0),
-                                         move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-
-  // Wait Shiftboss for QueryInitiateResponseMessage.
-  const tmb::AnnotatedMessage annotated_message = 
bus_->Receive(foreman_client_id_, 0, true);
-  const TaggedMessage &tagged_message = annotated_message.tagged_message;
-  DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
-  DLOG(INFO) << "PolicyEnforcerDistributed received typed '" << 
tagged_message.message_type()
-             << "' message from client " << annotated_message.sender;
-
-  S::QueryInitiateResponseMessage proto_response;
-  CHECK(proto_response.ParseFromArray(tagged_message.message(), 
tagged_message.message_bytes()));
+             << "') to all Shiftbosses";
+  QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+                                       shiftboss_addresses,
+                                       move(message),
+                                       bus_);
 }
 
 void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase 
*query_manager) {
@@ -198,8 +192,14 @@ void 
PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
   const tmb::client_id cli_id = query_handle->getClientId();
   const std::size_t query_id = query_handle->query_id();
 
+  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+  tmb::Address shiftboss_addresses;
+  for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
+    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+  }
+
   if (query_result == nullptr) {
-    // Clean up query execution states, i.e., QueryContext, in Shiftboss.
+    // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
     serialization::QueryTeardownMessage proto;
     proto.set_query_id(query_id);
 
@@ -211,15 +211,12 @@ void 
PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
                           proto_length,
                           kQueryTeardownMessage);
 
-    // TODO(zuyu): Support multiple shiftbosses.
     DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed 
'" << kQueryTeardownMessage
-               << "') to Shiftboss with TMB client ID " << 
shiftboss_directory_->getClientId(0);
-    tmb::MessageBus::SendStatus send_status =
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           foreman_client_id_,
-                                           
shiftboss_directory_->getClientId(0),
-                                           move(message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+               << "') to all Shiftbosses";
+    QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+                                         shiftboss_addresses,
+                                         move(message),
+                                         bus_);
 
     TaggedMessage cli_message(kQueryExecutionSuccessMessage);
 
@@ -227,7 +224,7 @@ void 
PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
     DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage 
(typed '"
                << kQueryExecutionSuccessMessage
                << "') to CLI with TMB client id " << cli_id;
-    send_status =
+    const tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            foreman_client_id_,
                                            cli_id,
@@ -257,15 +254,13 @@ void 
PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
                         kSaveQueryResultMessage);
   free(proto_bytes);
 
-  // TODO(zuyu): Support multiple shiftbosses.
+  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
   DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed 
'" << kSaveQueryResultMessage
-             << "') to Shiftboss with TMB client ID " << 
shiftboss_directory_->getClientId(0);
-  const tmb::MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         foreman_client_id_,
-                                         shiftboss_directory_->getClientId(0),
-                                         move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+             << "') to all Shiftbosses";
+  QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+                                       shiftboss_addresses,
+                                       move(message),
+                                       bus_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto 
b/query_execution/QueryExecutionMessages.proto
index 060efa1..1a2cb78 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -128,8 +128,10 @@ message SaveQueryResultMessage {
 }
 
 message SaveQueryResultResponseMessage {
-  required int32 relation_id = 1;
-  required uint32 cli_id = 2;  // tmb::client_id.
+  required uint64 query_id = 1;
+  required int32 relation_id = 2;
+  required uint32 cli_id = 3;  // tmb::client_id.
+  required uint64 shiftboss_index = 4;
 }
 
 message QueryExecutionSuccessMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp 
b/query_execution/QueryExecutionUtil.hpp
index 7a3a3b3..b41965c 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -121,6 +121,19 @@ class QueryExecutionUtil {
     DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
   }
 
+  static void BroadcastMessage(const tmb::client_id sender_id,
+                               const tmb::Address &addresses,
+                               tmb::TaggedMessage &&tagged_message,  // 
NOLINT(whitespace/operators)
+                               tmb::MessageBus *bus) {
+    // The sender broadcasts the given message to all 'addresses'.
+    tmb::MessageStyle style;
+    style.Broadcast(true);
+
+    const tmb::MessageBus::SendStatus send_status =
+        bus->Send(sender_id, addresses, style, std::move(tagged_message));
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+  }
+
   static void BroadcastPoisonMessage(const tmb::client_id sender_id, 
tmb::MessageBus *bus) {
     // Terminate all threads.
     // The sender thread broadcasts poison message to the workers and foreman.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 5c2c5e0..a434527 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -189,8 +189,10 @@ void Shiftboss::run() {
         query_contexts_.erase(proto.query_id());
 
         serialization::SaveQueryResultResponseMessage proto_response;
+        proto_response.set_query_id(proto.query_id());
         proto_response.set_relation_id(proto.relation_id());
         proto_response.set_cli_id(proto.cli_id());
+        proto_response.set_shiftboss_index(shiftboss_index_);
 
         const size_t proto_response_length = proto_response.ByteSize();
         char *proto_response_bytes = 
static_cast<char*>(malloc(proto_response_length));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp 
b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index e4d0765..ab10841 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -49,7 +49,7 @@ namespace quickstep {
 namespace optimizer {
 
 namespace {
-constexpr int kNumInstances = 1;
+constexpr int kNumInstances = 3;
 }  // namespace
 
 /**

Reply via email to