New operator to destroy aggregation state.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2e8e1c3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2e8e1c3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2e8e1c3d Branch: refs/heads/partitioned-aggregation Commit: 2e8e1c3d266cfb985290e3596e00c499e5a4a7be Parents: 6ecda1f Author: Harshad Deshmukh <hbdeshm...@apache.org> Authored: Sun Aug 21 09:32:51 2016 -0500 Committer: Harshad Deshmukh <hbdeshm...@apache.org> Committed: Tue Sep 6 15:01:34 2016 -0500 ---------------------------------------------------------------------- query_execution/QueryContext.hpp | 13 ++- relational_operators/CMakeLists.txt | 12 ++ .../DestroyAggregationStateOperator.cpp | 64 +++++++++++ .../DestroyAggregationStateOperator.hpp | 112 +++++++++++++++++++ relational_operators/WorkOrder.proto | 7 ++ 5 files changed, 207 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2e8e1c3d/query_execution/QueryContext.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp index c54c7ff..6a928e8 100644 --- a/query_execution/QueryContext.hpp +++ b/query_execution/QueryContext.hpp @@ -184,7 +184,7 @@ class QueryContext { /** * @brief Release the given AggregationOperationState. * - * @param id The id of the AggregationOperationState to destroy. + * @param id The id of the AggregationOperationState to release. * * @return The AggregationOperationState, alreadly created in the constructor. **/ @@ -195,6 +195,17 @@ class QueryContext { } /** + * @brief Destroy the given aggregation state. + * + * @param id The ID of the AggregationOperationState to destroy. + **/ + inline void destroyAggregationState(const aggregation_state_id id) { + DCHECK_LT(id, aggregation_states_.size()); + DCHECK(aggregation_states_[id]); + aggregation_states_[id].reset(nullptr); + } + + /** * @brief Whether the given BloomFilter id is valid. * * @param id The BloomFilter id. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2e8e1c3d/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 43a42f9..369deba 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -34,6 +34,7 @@ add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cp add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp) add_library(quickstep_relationaloperators_CreateTableOperator CreateTableOperator.cpp CreateTableOperator.hpp) add_library(quickstep_relationaloperators_DeleteOperator DeleteOperator.cpp DeleteOperator.hpp) +add_library(quickstep_relationaloperators_DestroyAggregationStateOperator DestroyAggregationStateOperator.cpp DestroyAggregationStateOperator.hpp) add_library(quickstep_relationaloperators_DestroyHashOperator DestroyHashOperator.cpp DestroyHashOperator.hpp) add_library(quickstep_relationaloperators_DropTableOperator DropTableOperator.cpp DropTableOperator.hpp) add_library(quickstep_relationaloperators_FinalizeAggregationOperator @@ -146,6 +147,16 @@ target_link_libraries(quickstep_relationaloperators_DestroyHashOperator quickstep_relationaloperators_WorkOrder_proto quickstep_utility_Macros tmb) +target_link_libraries(quickstep_relationaloperators_DestroyAggregationStateOperator + glog + quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer + quickstep_queryexecution_WorkOrdersContainer + quickstep_relationaloperators_RelationalOperator + quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto + quickstep_utility_Macros + tmb) target_link_libraries(quickstep_relationaloperators_DropTableOperator glog quickstep_catalog_CatalogDatabase @@ -484,6 +495,7 @@ target_link_libraries(quickstep_relationaloperators quickstep_relationaloperators_CreateIndexOperator quickstep_relationaloperators_CreateTableOperator quickstep_relationaloperators_DeleteOperator + quickstep_relationaloperators_DestroyAggregationStateOperator quickstep_relationaloperators_DestroyHashOperator quickstep_relationaloperators_DropTableOperator quickstep_relationaloperators_FinalizeAggregationOperator http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2e8e1c3d/relational_operators/DestroyAggregationStateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp new file mode 100644 index 0000000..62ca9e7 --- /dev/null +++ b/relational_operators/DestroyAggregationStateOperator.cpp @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "relational_operators/DestroyAggregationStateOperator.hpp" + +#include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +bool DestroyAggregationStateOperator::getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { + if (blocking_dependencies_met_ && !work_generated_) { + work_generated_ = true; + container->addNormalWorkOrder( + new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, query_context), + op_index_); + } + return work_generated_; +} + +bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (blocking_dependencies_met_ && !work_generated_) { + work_generated_ = true; + + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE); + proto->set_query_id(query_id_); + proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_); + + container->addWorkOrderProto(proto, op_index_); + } + return work_generated_; +} + +void DestroyAggregationStateWorkOrder::execute() { + query_context_->destroyAggregationState(aggr_state_index_); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2e8e1c3d/relational_operators/DestroyAggregationStateOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/DestroyAggregationStateOperator.hpp b/relational_operators/DestroyAggregationStateOperator.hpp new file mode 100644 index 0000000..b9a74ec --- /dev/null +++ b/relational_operators/DestroyAggregationStateOperator.hpp @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_ +#define QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_ + +#include "query_execution/QueryContext.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class StorageManager; +class WorkOrderProtosContainer; +class WorkOrdersContainer; + +/** \addtogroup RelationalOperators + * @{ + */ + +/** + * @brief An operator which destroys a shared aggregation state. + **/ +class DestroyAggregationStateOperator : public RelationalOperator { + public: + /** + * @brief Constructor. + * + * @param query_id The ID of the query to which this operator belongs. + * @param aggr_state_index The index of the AggregationState in QueryContext. + **/ + DestroyAggregationStateOperator(const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index) + : RelationalOperator(query_id), + aggr_state_index_(aggr_state_index), + work_generated_(false) {} + + ~DestroyAggregationStateOperator() override {} + + bool getAllWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) override; + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + + private: + const QueryContext::aggregation_state_id aggr_state_index_; + bool work_generated_; + + DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateOperator); +}; + +/** + * @brief A WorkOrder produced by DestroyAggregationStateOperator. + **/ +class DestroyAggregationStateWorkOrder : public WorkOrder { + public: + /** + * @brief Constructor. + * + * @param query_id The ID of the query to which this WorkOrder belongs. + * @param aggr_state_index The index of the AggregationState in QueryContext. + * @param query_context The QueryContext to use. + **/ + DestroyAggregationStateWorkOrder(const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index, + QueryContext *query_context) + : WorkOrder(query_id), + aggr_state_index_(aggr_state_index), + query_context_(DCHECK_NOTNULL(query_context)) {} + + ~DestroyAggregationStateWorkOrder() override {} + + void execute() override; + + private: + const QueryContext::aggregation_state_id aggr_state_index_; + QueryContext *query_context_; + + DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateWorkOrder); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2e8e1c3d/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index 02aa50e..3eed379 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -42,6 +42,7 @@ enum WorkOrderType { TEXT_SCAN = 18; UPDATE = 19; WINDOW_AGGREGATION = 20; + DESTROY_AGGREGATION_STATE = 21; } message WorkOrder { @@ -253,3 +254,9 @@ message WindowAggregationWorkOrder { optional int32 insert_destination_index = 338; } } + +message DestroyAggregationStateWorkOrder { + extend WorkOrder { + optional uint32 aggr_state_index = 339; + } +}