http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp index 8352d55..9204073 100644 --- a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp +++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp @@ -61,9 +61,11 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner { bus_.Initialize(); - foreman_.reset(new Foreman(&bus_, - test_database_loader_.catalog_database(), - test_database_loader_.storage_manager())); + main_thread_client_id_ = bus_.Connect(); + bus_.RegisterClientAsSender(main_thread_client_id_, kAdmitRequestMessage); + bus_.RegisterClientAsSender(main_thread_client_id_, kPoisonMessage); + bus_.RegisterClientAsReceiver(main_thread_client_id_, kWorkloadCompletionMessage); + worker_.reset(new Worker(0, &bus_)); std::vector<client_id> worker_client_ids; @@ -75,27 +77,20 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner { workers_.reset(new WorkerDirectory(1 /* number of workers */, worker_client_ids, numa_nodes)); - foreman_->setWorkerDirectory(workers_.get()); + foreman_.reset(new Foreman(main_thread_client_id_, + workers_.get(), + &bus_, + test_database_loader_.catalog_database(), + test_database_loader_.storage_manager())); + foreman_->start(); worker_->start(); } ~ExecutionGeneratorTestRunner() { - std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage()); - TaggedMessage poison_tagged_message(poison_message.get(), - sizeof(*poison_message), - quickstep::kPoisonMessage); - - Address worker_address; - MessageStyle single_receiver_style; - - worker_address.AddRecipient(worker_->getBusClientID()); - bus_.Send(foreman_->getBusClientID(), - worker_address, - single_receiver_style, - std::move(poison_tagged_message)); - + QueryExecutionUtil::BroadcastPoisonMessage(main_thread_client_id_, &bus_); worker_->join(); + foreman_->join(); } void runTestCase(const std::string &input, @@ -112,6 +107,8 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner { std::unique_ptr<WorkerDirectory> workers_; + tmb::client_id main_thread_client_id_; + // This map is needed for InsertDestination and some operators that send // messages to Foreman directly. To know the reason behind the design of this // map, see the note in InsertDestination.hpp.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/tests/TestDatabaseLoader.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/TestDatabaseLoader.cpp b/query_optimizer/tests/TestDatabaseLoader.cpp index 2de69b6..764ff2f 100644 --- a/query_optimizer/tests/TestDatabaseLoader.cpp +++ b/query_optimizer/tests/TestDatabaseLoader.cpp @@ -122,6 +122,7 @@ void TestDatabaseLoader::loadTestRelation() { nullptr, &storage_manager_, 0 /* dummy op index */, + 0, // dummy query ID. scheduler_client_id_, &bus_); int sign = 1; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/DeleteOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp index 47e36e9..933918b 100644 --- a/relational_operators/DeleteOperator.cpp +++ b/relational_operators/DeleteOperator.cpp @@ -132,6 +132,7 @@ void DeleteWorkOrder::execute() { proto.set_operator_index(delete_operator_index_); proto.set_block_id(input_block_id_); proto.set_relation_id(input_relation_.getID()); + proto.set_query_id(query_id_); // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const std::size_t proto_length = proto.ByteSize(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/DeleteOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp index aa8a688..74da8c1 100644 --- a/relational_operators/DeleteOperator.hpp +++ b/relational_operators/DeleteOperator.hpp @@ -174,6 +174,7 @@ class DeleteWorkOrder : public WorkOrder { StorageManager *storage_manager_; const std::size_t delete_operator_index_; + const tmb::client_id scheduler_client_id_; MessageBus *bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/HashJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp index 6f4271d..9762f04 100644 --- a/relational_operators/HashJoinOperator.hpp +++ b/relational_operators/HashJoinOperator.hpp @@ -644,14 +644,14 @@ class HashOuterJoinWorkOrder : public WorkOrder { * @param join_key_attributes The IDs of equijoin attributes in \c * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param hash_table The JoinHashTable to use. + * @param lookup_block_id The block id of the probe_relation. * @param selection A list of Scalars corresponding to the relation attributes * in \c output_destination. Each Scalar is evaluated for the joined * tuples, and the resulting value is inserted into the join result. * @param is_selection_on_build Whether each Scalar in the \p selection vector * is using attributes from the build relation as input. Note that the * length of this vector should equal the length of \p selection. - * @param lookup_block_id The block id of the probe_relation. + * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. **/ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/RebuildWorkOrder.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp index 86f8eaf..3125447 100644 --- a/relational_operators/RebuildWorkOrder.hpp +++ b/relational_operators/RebuildWorkOrder.hpp @@ -85,6 +85,7 @@ class RebuildWorkOrder : public WorkOrder { proto.set_operator_index(input_operator_index_); proto.set_block_id(block_ref_->getID()); proto.set_relation_id(input_relation_id_); + proto.set_query_id(query_id_); // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const std::size_t proto_length = proto.ByteSize(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/SortMergeRunOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp index e398d62..1603b78 100644 --- a/relational_operators/SortMergeRunOperator.cpp +++ b/relational_operators/SortMergeRunOperator.cpp @@ -327,6 +327,7 @@ void SortMergeRunWorkOrder::execute() { // Send completion message to operator. FeedbackMessage msg(SortMergeRunOperator::kRunOutputMessage, + getQueryID(), operator_index_, serialized_output.first, serialized_output.second); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/UpdateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp index 1b2979e..f103b0e 100644 --- a/relational_operators/UpdateOperator.cpp +++ b/relational_operators/UpdateOperator.cpp @@ -114,6 +114,7 @@ void UpdateWorkOrder::execute() { proto.set_operator_index(update_operator_index_); proto.set_block_id(input_block_id_); proto.set_relation_id(relation_.getID()); + proto.set_query_id(query_id_); // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const std::size_t proto_length = proto.ByteSize(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/UpdateOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp index f6c5053..4471a17 100644 --- a/relational_operators/UpdateOperator.hpp +++ b/relational_operators/UpdateOperator.hpp @@ -184,6 +184,7 @@ class UpdateWorkOrder : public WorkOrder { StorageManager *storage_manager_; const std::size_t update_operator_index_; + const tmb::client_id scheduler_client_id_; MessageBus *bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/WorkOrder.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp index 059865d..df195cc 100644 --- a/relational_operators/WorkOrder.hpp +++ b/relational_operators/WorkOrder.hpp @@ -65,20 +65,25 @@ class WorkOrder { * relational operator. */ struct FeedbackMessageHeader { + std::size_t query_id; std::size_t rel_op_index; std::size_t payload_size; FeedbackMessageType payload_type; /** * @brief Header constructor. + * + * @param query_id The ID of the query. * @param relational_op_index Index of the relation operator. * @param payload_size Size of the payload of the message. * @param payload_type Type of payload. */ - FeedbackMessageHeader(const std::size_t relational_op_index, + FeedbackMessageHeader(const std::size_t query_id, + const std::size_t relational_op_index, const std::size_t payload_size, const FeedbackMessageType payload_type) - : rel_op_index(relational_op_index), + : query_id(query_id), + rel_op_index(relational_op_index), payload_size(payload_size), payload_type(payload_type) {} }; @@ -93,17 +98,19 @@ class WorkOrder { * @brief Feedback message constructor. * * @param type Type of the message. + * @param query_id The ID of the query. * @param rel_op_index Relational operator index. * @param payload Blob of payload. * @param payload_size Size of the payload blob. * @param ownership Whether to take ownership of the payload blob. */ FeedbackMessage(const FeedbackMessageType type, + const std::size_t query_id, const std::size_t rel_op_index, void *payload, const std::size_t payload_size, const bool ownership = true) - : header_(rel_op_index, payload_size, type), + : header_(query_id, rel_op_index, payload_size, type), payload_(payload), ownership_(ownership) {} @@ -285,6 +292,13 @@ class WorkOrder { " receiver thread with TMB client ID " << receiver_id; } + /** + * @brief Get the ID of the query which this WorkOder belongs to. + **/ + inline const std::size_t getQueryID() const { + return query_id_; + } + protected: /** * @brief Constructor. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/AggregationOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp index fdcc54f..fd4692a 100644 --- a/relational_operators/tests/AggregationOperator_unittest.cpp +++ b/relational_operators/tests/AggregationOperator_unittest.cpp @@ -228,6 +228,8 @@ class AggregationOperatorTest : public ::testing::Test { // Setup the aggregation state proto in the query context proto. serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(0); // dummy query ID. + const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size(); serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states(); aggr_state_proto->set_relation_id(table_->getID()); @@ -319,6 +321,8 @@ class AggregationOperatorTest : public ::testing::Test { // Setup the aggregation state proto in the query context proto. serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(0); // dummy query ID. + const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size(); serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states(); aggr_state_proto->set_relation_id(table_->getID()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/HashJoinOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp index 074b603..9c34170 100644 --- a/relational_operators/tests/HashJoinOperator_unittest.cpp +++ b/relational_operators/tests/HashJoinOperator_unittest.cpp @@ -294,6 +294,7 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType> TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) { // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(0); // dummy query ID. const QueryContext::join_hash_table_id join_hash_table_index = query_context_proto.join_hash_tables_size(); @@ -434,6 +435,7 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) { TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) { // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(0); // dummy query ID. const QueryContext::join_hash_table_id join_hash_table_index = query_context_proto.join_hash_tables_size(); @@ -604,6 +606,7 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) { TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) { // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(0); // dummy query ID. const QueryContext::join_hash_table_id join_hash_table_index = query_context_proto.join_hash_tables_size(); @@ -739,6 +742,7 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) { TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) { // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(0); // dummy query ID. const QueryContext::join_hash_table_id join_hash_table_index = query_context_proto.join_hash_tables_size(); @@ -906,6 +910,7 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) { TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) { // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(0); // dummy query ID. const QueryContext::join_hash_table_id join_hash_table_index = query_context_proto.join_hash_tables_size(); @@ -1083,6 +1088,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) { TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) { // Setup the hash table proto in the query context proto. serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(0); // dummy query ID. const QueryContext::join_hash_table_id join_hash_table_index = query_context_proto.join_hash_tables_size(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/SortMergeRunOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp index fc10671..46bce5b 100644 --- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp +++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp @@ -189,6 +189,7 @@ class RunTest : public ::testing::Test { nullptr, storage_manager_.get(), kOpIndex, + 0, // dummy query ID. foreman_client_id_, &bus_)); } @@ -433,6 +434,7 @@ class RunMergerTest : public ::testing::Test { nullptr, storage_manager_.get(), kOpIndex, + 0, // dummy query ID. foreman_client_id_, &bus_)); } @@ -1269,6 +1271,8 @@ class SortMergeRunOperatorTest : public ::testing::Test { ASSERT_EQ(null_col3_, result_table_->getAttributeByName("null-col-3")->getID()); ASSERT_EQ(tid_col_, result_table_->getAttributeByName("tid")->getID()); + query_context_proto_.set_query_id(0); // dummy query ID. + // Setup the InsertDestination proto in the query context proto. insert_destination_index_ = query_context_proto_.insert_destinations_size(); serialization::InsertDestination *insert_destination_proto = query_context_proto_.add_insert_destinations(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/SortRunGenerationOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp index 71a80e4..bd682c2 100644 --- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp +++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp @@ -328,6 +328,7 @@ class SortRunGenerationOperatorTest : public ::testing::Test { const std::vector<bool> &null_ordering) { // Setup the InsertDestination proto in the query context proto. serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(0); // dummy query ID. const QueryContext::insert_destination_id insert_destination_index = query_context_proto.insert_destinations_size(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/TextScanOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp index 5860745..5bcbee5 100644 --- a/relational_operators/tests/TextScanOperator_unittest.cpp +++ b/relational_operators/tests/TextScanOperator_unittest.cpp @@ -180,6 +180,7 @@ TEST_F(TextScanOperatorTest, ScanTest) { // Setup the InsertDestination proto in the query context proto. serialization::QueryContext query_context_proto; + query_context_proto.set_query_id(0); // dummy query ID. QueryContext::insert_destination_id output_destination_index = query_context_proto.insert_destinations_size(); serialization::InsertDestination *output_destination_proto = query_context_proto.add_insert_destinations(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/storage/InsertDestination.cpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp index 354bed4..2866c5f 100644 --- a/storage/InsertDestination.cpp +++ b/storage/InsertDestination.cpp @@ -60,6 +60,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation, const StorageBlockLayout *layout, StorageManager *storage_manager, const std::size_t relational_op_index, + const std::size_t query_id, const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) : thread_id_map_(*ClientIDMap::Instance()), @@ -67,6 +68,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation, relation_(relation), layout_(layout), relational_op_index_(relational_op_index), + query_id_(query_id), scheduler_client_id_(scheduler_client_id), bus_(DCHECK_NOTNULL(bus)) { if (layout_ == nullptr) { @@ -74,11 +76,13 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation, } } -InsertDestination* InsertDestination::ReconstructFromProto(const serialization::InsertDestination &proto, - const CatalogRelationSchema &relation, - StorageManager *storage_manager, - const tmb::client_id scheduler_client_id, - tmb::MessageBus *bus) { +InsertDestination* InsertDestination::ReconstructFromProto( + const std::size_t query_id, + const serialization::InsertDestination &proto, + const CatalogRelationSchema &relation, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { DCHECK(ProtoIsValid(proto, relation)); StorageBlockLayout *layout = nullptr; @@ -93,6 +97,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization:: layout, storage_manager, proto.relational_op_index(), + query_id, scheduler_client_id, bus); } @@ -107,6 +112,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization:: storage_manager, move(blocks), proto.relational_op_index(), + query_id, scheduler_client_id, bus); } @@ -134,6 +140,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization:: storage_manager, move(partitions), proto.relational_op_index(), + query_id, scheduler_client_id, bus); } @@ -262,6 +269,7 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() { serialization::CatalogRelationNewBlockMessage proto; proto.set_relation_id(relation_.getID()); proto.set_block_id(new_id); + proto.set_query_id(getQueryID()); const size_t proto_length = proto.ByteSize(); char *proto_bytes = static_cast<char*>(malloc(proto_length)); @@ -309,6 +317,7 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() { serialization::CatalogRelationNewBlockMessage proto; proto.set_relation_id(relation_.getID()); proto.set_block_id(new_id); + proto.set_query_id(getQueryID()); const size_t proto_length = proto.ByteSize(); char *proto_bytes = static_cast<char*>(malloc(proto_length)); @@ -385,21 +394,29 @@ const std::vector<block_id>& BlockPoolInsertDestination::getTouchedBlocksInterna return done_block_ids_; } -PartitionAwareInsertDestination::PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header, - const CatalogRelationSchema &relation, - const StorageBlockLayout *layout, - StorageManager *storage_manager, - vector<vector<block_id>> &&partitions, - const std::size_t relational_op_index, - const tmb::client_id scheduler_client_id, - tmb::MessageBus *bus) - : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus), +PartitionAwareInsertDestination::PartitionAwareInsertDestination( + PartitionSchemeHeader *partition_scheme_header, + const CatalogRelationSchema &relation, + const StorageBlockLayout *layout, + StorageManager *storage_manager, + vector<vector<block_id>> &&partitions, + const std::size_t relational_op_index, + const std::size_t query_id, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) + : InsertDestination(relation, + layout, + storage_manager, + relational_op_index, + query_id, + scheduler_client_id, + bus), partition_scheme_header_(DCHECK_NOTNULL(partition_scheme_header)), available_block_refs_(partition_scheme_header_->getNumPartitions()), available_block_ids_(move(partitions)), done_block_ids_(partition_scheme_header_->getNumPartitions()), - mutexes_for_partition_(new SpinMutex[partition_scheme_header_->getNumPartitions()]) { -} + mutexes_for_partition_( + new SpinMutex[partition_scheme_header_->getNumPartitions()]) {} MutableBlockReference PartitionAwareInsertDestination::createNewBlock() { FATAL_ERROR("PartitionAwareInsertDestination::createNewBlock needs a partition id as an argument."); @@ -415,6 +432,7 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition proto.set_relation_id(relation_.getID()); proto.set_block_id(new_id); proto.set_partition_id(part_id); + proto.set_query_id(getQueryID()); const size_t proto_length = proto.ByteSize(); char *proto_bytes = static_cast<char*>(malloc(proto_length)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/storage/InsertDestination.hpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp index 670cd6c..5ff33f5 100644 --- a/storage/InsertDestination.hpp +++ b/storage/InsertDestination.hpp @@ -78,6 +78,7 @@ class InsertDestination : public InsertDestinationInterface { * @param storage_manager The StorageManager to use. * @param relational_op_index The index of the relational operator in the * QueryPlan DAG that has outputs. + * @param query_id The ID of this query. * @param scheduler_client_id The TMB client ID of the scheduler thread. * @param bus A pointer to the TMB. **/ @@ -85,6 +86,7 @@ class InsertDestination : public InsertDestinationInterface { const StorageBlockLayout *layout, StorageManager *storage_manager, const std::size_t relational_op_index, + const std::size_t query_id, const tmb::client_id scheduler_client_id, tmb::MessageBus *bus); @@ -98,6 +100,7 @@ class InsertDestination : public InsertDestinationInterface { * @brief A factory method to generate the InsertDestination from the * serialized Protocol Buffer representation. * + * @param query_id The ID of this query. * @param proto A serialized Protocol Buffer representation of an * InsertDestination, originally generated by the optimizer. * @param relation The relation to insert tuples into. @@ -107,11 +110,13 @@ class InsertDestination : public InsertDestinationInterface { * * @return The constructed InsertDestination. */ - static InsertDestination* ReconstructFromProto(const serialization::InsertDestination &proto, - const CatalogRelationSchema &relation, - StorageManager *storage_manager, - const tmb::client_id scheduler_client_id, - tmb::MessageBus *bus); + static InsertDestination* ReconstructFromProto( + const std::size_t query_id, + const serialization::InsertDestination &proto, + const CatalogRelationSchema &relation, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus); /** * @brief Check whether a serialized InsertDestination is fully-formed and @@ -211,6 +216,7 @@ class InsertDestination : public InsertDestinationInterface { proto.set_operator_index(relational_op_index_); proto.set_block_id(id); proto.set_relation_id(relation_.getID()); + proto.set_query_id(query_id_); // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const std::size_t proto_length = proto.ByteSize(); @@ -253,6 +259,10 @@ class InsertDestination : public InsertDestinationInterface { " ID " << scheduler_client_id_; } + inline const std::size_t getQueryID() const { + return query_id_; + } + const ClientIDMap &thread_id_map_; StorageManager *storage_manager_; @@ -260,6 +270,7 @@ class InsertDestination : public InsertDestinationInterface { std::unique_ptr<const StorageBlockLayout> layout_; const std::size_t relational_op_index_; + const std::size_t query_id_; tmb::client_id scheduler_client_id_; tmb::MessageBus *bus_; @@ -288,10 +299,16 @@ class AlwaysCreateBlockInsertDestination : public InsertDestination { const StorageBlockLayout *layout, StorageManager *storage_manager, const std::size_t relational_op_index, + const std::size_t query_id, const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) - : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) { - } + : InsertDestination(relation, + layout, + storage_manager, + relational_op_index, + query_id, + scheduler_client_id, + bus) {} ~AlwaysCreateBlockInsertDestination() override { } @@ -334,16 +351,23 @@ class BlockPoolInsertDestination : public InsertDestination { * @param relational_op_index The index of the relational operator in the * QueryPlan DAG that has outputs. * @param scheduler_client_id The TMB client ID of the scheduler thread. + * @param query_id The ID of the query. * @param bus A pointer to the TMB. **/ BlockPoolInsertDestination(const CatalogRelationSchema &relation, const StorageBlockLayout *layout, StorageManager *storage_manager, const std::size_t relational_op_index, + const std::size_t query_id, const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) - : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) { - } + : InsertDestination(relation, + layout, + storage_manager, + relational_op_index, + query_id, + scheduler_client_id, + bus) {} /** * @brief Constructor. @@ -363,9 +387,16 @@ class BlockPoolInsertDestination : public InsertDestination { StorageManager *storage_manager, std::vector<block_id> &&blocks, const std::size_t relational_op_index, + const std::size_t query_id, const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) - : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus), + : InsertDestination(relation, + layout, + storage_manager, + relational_op_index, + query_id, + scheduler_client_id, + bus), available_block_ids_(std::move(blocks)) { // TODO(chasseur): Once block fill statistics are available, replace this // with something smarter. @@ -386,7 +417,6 @@ class BlockPoolInsertDestination : public InsertDestination { MutableBlockReference createNewBlock() override; private: - FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest); FRIEND_TEST(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest); // A vector of references to blocks which are loaded in memory. @@ -416,17 +446,20 @@ class PartitionAwareInsertDestination : public InsertDestination { * @param partitions The blocks in partitions. * @param relational_op_index The index of the relational operator in the * QueryPlan DAG that has outputs. + * @param query_id The ID of the query. * @param scheduler_client_id The TMB client ID of the scheduler thread. * @param bus A pointer to the TMB. **/ - PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header, - const CatalogRelationSchema &relation, - const StorageBlockLayout *layout, - StorageManager *storage_manager, - std::vector<std::vector<block_id>> &&partitions, - const std::size_t relational_op_index, - const tmb::client_id scheduler_client_id, - tmb::MessageBus *bus); + PartitionAwareInsertDestination( + PartitionSchemeHeader *partition_scheme_header, + const CatalogRelationSchema &relation, + const StorageBlockLayout *layout, + StorageManager *storage_manager, + std::vector<std::vector<block_id>> &&partitions, + const std::size_t relational_op_index, + const std::size_t query_id, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus); ~PartitionAwareInsertDestination() override { delete[] mutexes_for_partition_;