Repository: incubator-quickstep Updated Branches: refs/heads/refactor-hashjoin-probe-build 7c5bdf92e -> a61b99e9e (forced update)
Cleaned up the messages w/ a dummy payload. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccea2ff8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccea2ff8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccea2ff8 Branch: refs/heads/refactor-hashjoin-probe-build Commit: ccea2ff83ea73e950d52c152cc422a9e93cf6aad Parents: 52a32a3 Author: Zuyu Zhang <zu...@twitter.com> Authored: Fri Jul 29 23:52:33 2016 -0700 Committer: Zuyu Zhang <zu...@twitter.com> Committed: Mon Aug 1 10:11:19 2016 -0700 ---------------------------------------------------------------------- query_execution/ForemanSingleNode.cpp | 4 +- query_execution/QueryExecutionMessages.proto | 4 - query_execution/QueryExecutionUtil.hpp | 5 +- query_execution/README.md | 110 +++++++++---------- query_execution/WorkerMessage.hpp | 13 +-- query_execution/tests/BlockLocator_unittest.cpp | 11 +- storage/tests/DataExchange_unittest.cpp | 11 +- 7 files changed, 60 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/ForemanSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp index cda02a7..d2b56ae 100644 --- a/query_execution/ForemanSingleNode.cpp +++ b/query_execution/ForemanSingleNode.cpp @@ -167,9 +167,7 @@ void ForemanSingleNode::run() { if (!policy_enforcer_->hasQueries()) { // Signal the main thread that there are no queries to be executed. // Currently the message doesn't have any real content. - const int dummy_payload = 0; - TaggedMessage completion_tagged_message( - &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage); + TaggedMessage completion_tagged_message(kWorkloadCompletionMessage); const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage( bus_, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 308d736..f2219f6 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -20,10 +20,6 @@ import "catalog/Catalog.proto"; import "query_execution/QueryContext.proto"; import "relational_operators/WorkOrder.proto"; -// Used for any messages that do not carry payloads. -message EmptyMessage { -} - // Note: There are different types of completion messages for normal work orders // rebuild work orders. This can be potentially helpful when we want to collect // different statistics for executing different types of work orders. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/QueryExecutionUtil.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp index 6ea4a29..5994f22 100644 --- a/query_execution/QueryExecutionUtil.hpp +++ b/query_execution/QueryExecutionUtil.hpp @@ -123,10 +123,7 @@ class QueryExecutionUtil { style.Broadcast(true); Address address; address.All(true); - std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage()); - TaggedMessage poison_tagged_message(poison_message.get(), - sizeof(*poison_message), - kPoisonMessage); + TaggedMessage poison_tagged_message(kPoisonMessage); const tmb::MessageBus::SendStatus send_status = bus->Send( sender_id, address, style, std::move(poison_tagged_message)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/README.md ---------------------------------------------------------------------- diff --git a/query_execution/README.md b/query_execution/README.md index 22ad91d..12e0f57 100644 --- a/query_execution/README.md +++ b/query_execution/README.md @@ -3,19 +3,19 @@ ## Types of threads There are two kinds of threads in Quickstep - Foreman and Worker. The foreman thread controls the query execution progress, finds schedulable work (called as -WorkOrder) and assigns (or schedules) it for execution to the Worker threads. +WorkOrder) and assigns (or schedules) it for execution to the Worker threads. The Worker threads receive the WorkOrders and execute them. After execution they -send a completion message (or response message) back to Foreman. +send a completion message (or response message) back to Foreman. ## High level functionality of Foreman -Foreman requests all the RelationalOperators in the physical query plan +Foreman requests all the RelationalOperators in the physical query plan represented as a DAG to give any schedulable work (in the form of WorkOrders). While doing so, Foreman has to respect dependencies between operators. There are -two kinds of dependencies between operators - pipeline breaking (or blocking) +two kinds of dependencies between operators - pipeline breaking (or blocking) and pipeline non-breaking (or non-blocking). In the first case, the output of the producer operator can't be pipelined to the consumer operator. In the second case, the Foreman will facilitate the pipelining of the intermediate output -produced by the producer operator to the consumer operator. +produced by the producer operator to the consumer operator. ## Messages in execution engine @@ -26,110 +26,110 @@ of the message. Foreman -> Worker : WorkerMessage which consists of the following things - A pointer to the WorkOrder to be executed. The WorkOrder could be a normal WorkOrder or a rebuild WorkOrder. A normal WorkOrder involves the invocation of -WorkOrder::execute() method which is overriden by all of the RelationalOperator -classes. A rebuild WorkOrder has one StorageBlock as input and calls a +WorkOrder::execute() method which is overriden by all of the RelationalOperator +classes. A rebuild WorkOrder has one StorageBlock as input and calls a rebuild() method on the block. More details about rebuild() can be found in the -storage module. +storage module. - The index of the relational operator in the query plan DAG that produced the -WorkOrder. - -Main thread -> Worker : WorkerMessage of type PoisonMessage. This message is -used to terminate the Worker thread, typically when shutting down the Quickstep -process. +WorkOrder. ### ForemanMessage -Multiple senders are possible for this message. There are multiple types of +Multiple senders are possible for this message. There are multiple types of ForemanMessages, each of which indicates the purpose of the message. -Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and -RebuildCompletion are sent after a Worker finishes executing a respective type -of WorkOrder. This message helps the Foreman track the progress of individual -operators as well as the whole query. +Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and +RebuildCompletion are sent after a Worker finishes executing a respective type +of WorkOrder. This message helps the Foreman track the progress of individual +operators as well as the whole query. Some relational operators and InsertDestination -> Foreman : ForemanMessage of -types DataPipeline and WorkOrdersAvailable. InsertDestination first determines +types DataPipeline and WorkOrdersAvailable. InsertDestination first determines when an output block of a relational operator gets full. Once a block is full, -it streams the unique block ID of the filled block along with the index of the -relational operator that produced the block to Foreman with the message type +it streams the unique block ID of the filled block along with the index of the +relational operator that produced the block to Foreman with the message type DataPipeline. Some operators which modify the block in place also send similar -messages to Foreman. +messages to Foreman. ### FeedbackMessage This message is sent from Workers to the Foreman during a WorkOrder execution. In certain operators, e.g. TextScan (used for bulk loading data from text files) -and Sort, there is a communication between the relational operator and its -WorkOrders. In such cases, when a WorkOrder is under execution on a Worker +and Sort, there is a communication between the relational operator and its +WorkOrders. In such cases, when a WorkOrder is under execution on a Worker thread, a FeedbackMessage is sent from the WorkOrder via the Worker to Foreman. Foreman relays this message to the relational operator that produced the sender -WorkOrder. The relational operator uses this message to update its internal -state to potentially generate newer WorkOrders. +WorkOrder. The relational operator uses this message to update its internal +state to potentially generate newer WorkOrders. + +### PoisonMessage +This message is used to terminate a thread (i.e., Foreman and Worker), typically +when shutting down the Quickstep process. ## How does the Foreman react after receiving various messages? ### WorkOrder completion message * Update the book-keeping of pending WorkOrders per Worker and per operator. * Fetch new WorkOrders if available for the operator of whose WorkOrder was -just executed. +just executed. * Update the state of an operator - the possible options are: - Normal WorkOrders are still under execution - All normal WorkOrders have finished execution and rebuild WorkOrders are yet - to be generated. + to be generated. - All normal WorkOrders have finished execution, rebuild WorkOrders have been - generated and issued to Workers. + generated and issued to Workers. - All normal and rebuild WorkOrders have been executed AND all the dependency - operators for the given operator have finished execution, therefore the given - operator has finished its execution. -* Fetch the WorkOrders from the dependents of the given operator. + operators for the given operator have finished execution, therefore the given + operator has finished its execution. +* Fetch the WorkOrders from the dependents of the given operator. ### Rebuild WorkOrder completion message * Update the book-keeping of pending WorkOrders per Worker and per operator. * If all the rebuild WorkOrders have finished their execution, try to fetch the WorkOrders of the dependent operators of the operator whose rebuild WorkOrder -was just executed. +was just executed. ### Data pipeline message -* Find the consumer operators (i.e. operators which have a non -pipeline-breaking link) of the producer operator. -* Stream the block ID to the eligible consumer operators. -* Fetch new WorkOrders from these consumer operators which may have become -available because of the streaming of data. +* Find the consumer operators (i.e. operators which have a non +pipeline-breaking link) of the producer operator. +* Stream the block ID to the eligible consumer operators. +* Fetch new WorkOrders from these consumer operators which may have become +available because of the streaming of data. ### WorkOrder available message * Fetch new WorkOrders that may have become available. ### Feedback message -* Relay the feedback message to a specified relational operator. The recipient -operator is specified in the header of the message. +* Relay the feedback message to a specified relational operator. The recipient +operator is specified in the header of the message. ## Example -We look at a sample query to better describe the flow of messages - +We look at a sample query to better describe the flow of messages - SELECT R.a, S.b from R, S where R.a = S.a and R.c < 20; -This is an equi-join query which can be implemented using a hash join. We assume -that S is a larger relation and the build relation is the output of the +This is an equi-join query which can be implemented using a hash join. We assume +that S is a larger relation and the build relation is the output of the selection on R. The query execution plan involves the following operators: -* SelectOperator to filter R based on predicate R.c < 20 (We call the output as -R') +* SelectOperator to filter R based on predicate R.c < 20 (We call the output as +R') * BuildHashOperator to construct a hash table on R' * HashJoinOperator to probe the hash table, where the probe relation is S * DestroyHashTableOperator to destroy the hash table after the join is done -* Multiple DropTableOperators to destroy the temporaray relations produced as -output. +* Multiple DropTableOperators to destroy the temporaray relations produced as +output. R has two blocks with IDs as 1 and 2. S has two blocks with IDs as 3 and 4. -We assume that the SelectOperator produces one filled block and one partially -filled block as output. Note that in the query plan DAG, the link between -SelectOperator and BuildHashOperator allows streaming of data. The -HashJoinOperator's WorkOrder can't be generated unless all of the +We assume that the SelectOperator produces one filled block and one partially +filled block as output. Note that in the query plan DAG, the link between +SelectOperator and BuildHashOperator allows streaming of data. The +HashJoinOperator's WorkOrder can't be generated unless all of the BuildHashOperator's WorkOrders have finished their execution. The execution is -assumed to be performed by a single Worker thread. +assumed to be performed by a single Worker thread. -The following table describes the message exchange that happens during the -query excution. We primarily focus on three operators - Select, BuildHash and -HashJoin (probe). +The following table describes the message exchange that happens during the +query excution. We primarily focus on three operators - Select, BuildHash and +HashJoin (probe). | Sender | Receiver | Message | Message Description | |:-----------------:|----------|---------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/WorkerMessage.hpp ---------------------------------------------------------------------- diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp index 560c1ba..a0434de 100644 --- a/query_execution/WorkerMessage.hpp +++ b/query_execution/WorkerMessage.hpp @@ -35,7 +35,6 @@ class WorkerMessage { enum class WorkerMessageType { kRebuildWorkOrder = 0, kWorkOrder, - kPoison }; /** @@ -70,15 +69,6 @@ class WorkerMessage { } /** - * @brief A static factory method for generating a poison message. - * - * @return The constructed PoisonMessage. - **/ - static WorkerMessage* PoisonMessage() { - return new WorkerMessage(nullptr, 0, WorkerMessageType::kPoison); - } - - /** * @brief Destructor. **/ ~WorkerMessage() { @@ -128,8 +118,7 @@ class WorkerMessage { /** * @brief Constructor. * - * @param work_unit The work order to be executed by the worker. A NULL - * workorder indicates a poison message. + * @param work_unit The work order to be executed by the worker. * @param relational_op_index The index of the relational operator in the * query plan DAG that generated the given WorkOrder. * @param type Type of the WorkerMessage. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/tests/BlockLocator_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp index fe7b86b..fd25e9e 100644 --- a/query_execution/tests/BlockLocator_unittest.cpp +++ b/query_execution/tests/BlockLocator_unittest.cpp @@ -90,16 +90,7 @@ class BlockLocatorTest : public ::testing::Test { virtual void TearDown() { storage_manager_.reset(); - serialization::EmptyMessage proto; - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kPoisonMessage); - free(proto_bytes); + TaggedMessage message(kPoisonMessage); LOG(INFO) << "Worker (id '" << worker_client_id_ << "') sent PoisonMessage (typed '" << kPoisonMessage http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/storage/tests/DataExchange_unittest.cpp ---------------------------------------------------------------------- diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp index 38d12f6..4bad17b 100644 --- a/storage/tests/DataExchange_unittest.cpp +++ b/storage/tests/DataExchange_unittest.cpp @@ -105,16 +105,7 @@ class DataExchangeTest : public ::testing::Test { data_exchanger_expected_.shutdown(); storage_manager_expected_.reset(); - serialization::EmptyMessage proto; - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kPoisonMessage); - free(proto_bytes); + TaggedMessage message(kPoisonMessage); LOG(INFO) << "Worker (id '" << worker_client_id_ << "') sent PoisonMessage (typed '" << kPoisonMessage