Initiated query execution data structure in all Shiftbosses.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a9f2ee16 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a9f2ee16 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a9f2ee16 Branch: refs/heads/multiple_shiftboss Commit: a9f2ee16de7d33e34afc93e2bd82e919b827f649 Parents: 17ffbb0 Author: Zuyu Zhang <zu...@apache.org> Authored: Sun Oct 9 18:44:00 2016 -0700 Committer: Zuyu Zhang <zu...@apache.org> Committed: Mon Oct 17 20:37:37 2016 -0700 ---------------------------------------------------------------------- query_execution/ForemanDistributed.cpp | 15 ++++- query_execution/ForemanDistributed.hpp | 5 ++ query_execution/PolicyEnforcerBase.cpp | 2 + query_execution/PolicyEnforcerBase.hpp | 13 ++++ query_execution/PolicyEnforcerDistributed.cpp | 67 +++++++++----------- query_execution/QueryExecutionMessages.proto | 6 +- query_execution/QueryExecutionUtil.hpp | 13 ++++ query_execution/Shiftboss.cpp | 2 + .../DistributedExecutionGeneratorTestRunner.hpp | 2 +- 9 files changed, 84 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 9c20465..56b319b 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -18,6 +18,8 @@ #include <cstdio> #include <cstdlib> #include <memory> +#include <unordered_map> +#include <unordered_set> #include <utility> #include <vector> @@ -163,7 +165,9 @@ void ForemanDistributed::run() { break; } case kQueryInitiateResponseMessage: { - // TODO(zuyu): check the query id. + S::QueryInitiateResponseMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + CHECK(policy_enforcer_->existQuery(proto.query_id())); break; } case kCatalogRelationNewBlockMessage: // Fall through @@ -183,7 +187,14 @@ void ForemanDistributed::run() { S::SaveQueryResultResponseMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id()); + const std::size_t query_id = proto.query_id(); + query_result_saved_shiftbosses_[query_id].insert(proto.shiftboss_index()); + + // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses. + if (query_result_saved_shiftbosses_[query_id].size() == shiftboss_directory_.size()) { + processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id()); + query_result_saved_shiftbosses_.erase(query_id); + } break; } case kPoisonMessage: { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp index fc1ede5..b42795c 100644 --- a/query_execution/ForemanDistributed.hpp +++ b/query_execution/ForemanDistributed.hpp @@ -18,6 +18,8 @@ #include <cstddef> #include <cstdio> #include <memory> +#include <unordered_map> +#include <unordered_set> #include <vector> #include "catalog/CatalogTypedefs.hpp" @@ -120,6 +122,9 @@ class ForemanDistributed final : public ForemanBase { std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_; + // From a query id to a set of Shiftbosses that save query result. + std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_; + DISALLOW_COPY_AND_ASSIGN(ForemanDistributed); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp index 4174bd6..745ded6 100644 --- a/query_execution/PolicyEnforcerBase.cpp +++ b/query_execution/PolicyEnforcerBase.cpp @@ -142,6 +142,8 @@ void PolicyEnforcerBase::removeQuery(const std::size_t query_id) { << " that hasn't finished its execution"; } admitted_queries_.erase(query_id); + + removed_query_ids_.insert(query_id); } bool PolicyEnforcerBase::admitQueries( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/PolicyEnforcerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp index 62906e9..25da598 100644 --- a/query_execution/PolicyEnforcerBase.hpp +++ b/query_execution/PolicyEnforcerBase.hpp @@ -24,6 +24,7 @@ #include <memory> #include <queue> #include <unordered_map> +#include <unordered_set> #include <vector> #include "query_execution/QueryExecutionTypedefs.hpp" @@ -106,6 +107,16 @@ class PolicyEnforcerBase { void processMessage(const TaggedMessage &tagged_message); /** + * @brief Check if the given query id ever exists. + * + * @return True if the query ever exists, otherwise false. + **/ + inline bool existQuery(const std::size_t query_id) const { + return admitted_queries_.find(query_id) != admitted_queries_.end() || + removed_query_ids_.find(query_id) != removed_query_ids_.end(); + } + + /** * @brief Check if there are any queries to be executed. * * @return True if there is at least one active or waiting query, false if @@ -163,6 +174,8 @@ class PolicyEnforcerBase { // Key = query ID, value = QueryManagerBase* for the key query. std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_; + std::unordered_set<std::size_t> removed_query_ids_; + // The queries which haven't been admitted yet. std::queue<QueryHandle*> waiting_queries_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index 47491ed..c06fd86 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -37,6 +37,7 @@ #include "gflags/gflags.h" #include "glog/logging.h" +#include "tmb/address.h" #include "tmb/id_typedefs.h" #include "tmb/message_bus.h" #include "tmb/tagged_message.h" @@ -170,25 +171,18 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand kQueryInitiateMessage); free(proto_bytes); - // TODO(zuyu): Multiple Shiftbosses support. + // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses. + tmb::Address shiftboss_addresses; + for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) { + shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i)); + } + DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage - << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); - const tmb::MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - shiftboss_directory_->getClientId(0), - move(message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK); - - // Wait Shiftboss for QueryInitiateResponseMessage. - const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); - const TaggedMessage &tagged_message = annotated_message.tagged_message; - DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type()); - DLOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type() - << "' message from client " << annotated_message.sender; - - S::QueryInitiateResponseMessage proto_response; - CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + << "') to all Shiftbosses"; + QueryExecutionUtil::BroadcastMessage(foreman_client_id_, + shiftboss_addresses, + move(message), + bus_); } void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) { @@ -198,8 +192,14 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage const tmb::client_id cli_id = query_handle->getClientId(); const std::size_t query_id = query_handle->query_id(); + // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses. + tmb::Address shiftboss_addresses; + for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) { + shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i)); + } + if (query_result == nullptr) { - // Clean up query execution states, i.e., QueryContext, in Shiftboss. + // Clean up query execution states, i.e., QueryContext, in Shiftbosses. serialization::QueryTeardownMessage proto; proto.set_query_id(query_id); @@ -211,15 +211,12 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage proto_length, kQueryTeardownMessage); - // TODO(zuyu): Support multiple shiftbosses. DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage - << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); - tmb::MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - shiftboss_directory_->getClientId(0), - move(message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK); + << "') to all Shiftbosses"; + QueryExecutionUtil::BroadcastMessage(foreman_client_id_, + shiftboss_addresses, + move(message), + bus_); TaggedMessage cli_message(kQueryExecutionSuccessMessage); @@ -227,7 +224,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage << "') to CLI with TMB client id " << cli_id; - send_status = + const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, @@ -257,15 +254,13 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage kSaveQueryResultMessage); free(proto_bytes); - // TODO(zuyu): Support multiple shiftbosses. + // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses. DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage - << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); - const tmb::MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - shiftboss_directory_->getClientId(0), - move(message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK); + << "') to all Shiftbosses"; + QueryExecutionUtil::BroadcastMessage(foreman_client_id_, + shiftboss_addresses, + move(message), + bus_); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 060efa1..1a2cb78 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -128,8 +128,10 @@ message SaveQueryResultMessage { } message SaveQueryResultResponseMessage { - required int32 relation_id = 1; - required uint32 cli_id = 2; // tmb::client_id. + required uint64 query_id = 1; + required int32 relation_id = 2; + required uint32 cli_id = 3; // tmb::client_id. + required uint64 shiftboss_index = 4; } message QueryExecutionSuccessMessage { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/QueryExecutionUtil.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp index 7a3a3b3..b41965c 100644 --- a/query_execution/QueryExecutionUtil.hpp +++ b/query_execution/QueryExecutionUtil.hpp @@ -121,6 +121,19 @@ class QueryExecutionUtil { DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type()); } + static void BroadcastMessage(const tmb::client_id sender_id, + const tmb::Address &addresses, + tmb::TaggedMessage &&tagged_message, // NOLINT(whitespace/operators) + tmb::MessageBus *bus) { + // The sender broadcasts the given message to all 'addresses'. + tmb::MessageStyle style; + style.Broadcast(true); + + const tmb::MessageBus::SendStatus send_status = + bus->Send(sender_id, addresses, style, std::move(tagged_message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); + } + static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) { // Terminate all threads. // The sender thread broadcasts poison message to the workers and foreman. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index 5c2c5e0..a434527 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -189,8 +189,10 @@ void Shiftboss::run() { query_contexts_.erase(proto.query_id()); serialization::SaveQueryResultResponseMessage proto_response; + proto_response.set_query_id(proto.query_id()); proto_response.set_relation_id(proto.relation_id()); proto_response.set_cli_id(proto.cli_id()); + proto_response.set_shiftboss_index(shiftboss_index_); const size_t proto_response_length = proto_response.ByteSize(); char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a9f2ee16/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp index e4d0765..ab10841 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp @@ -49,7 +49,7 @@ namespace quickstep { namespace optimizer { namespace { -constexpr int kNumInstances = 1; +constexpr int kNumInstances = 3; } // namespace /**