Parallel work order generation support.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bc524a97 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bc524a97 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bc524a97 Branch: refs/heads/partitioned-aggregation Commit: bc524a97375edbfd748945c69cf9b2928c4944a1 Parents: 3b5b8a9 Author: Harshad Deshmukh <hbdeshm...@apache.org> Authored: Fri Aug 19 10:35:09 2016 -0500 Committer: Harshad Deshmukh <hbdeshm...@apache.org> Committed: Tue Sep 20 12:57:06 2016 -0500 ---------------------------------------------------------------------- .../FinalizeAggregationOperator.cpp | 35 ++++++++++++++++---- .../FinalizeAggregationOperator.hpp | 10 ++++-- 2 files changed, 36 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc524a97/relational_operators/FinalizeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp index 7e337de..55d1357 100644 --- a/relational_operators/FinalizeAggregationOperator.cpp +++ b/relational_operators/FinalizeAggregationOperator.cpp @@ -41,12 +41,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders( if (blocking_dependencies_met_ && !started_) { started_ = true; - container->addNormalWorkOrder( - new FinalizeAggregationWorkOrder( - query_id_, - query_context->getAggregationState(aggr_state_index_), - query_context->getInsertDestination(output_destination_index_)), - op_index_); + DCHECK(query_context->getAggregationState(aggr_state_index_) != nullptr); + if (query_context->getAggregationState(aggr_state_index_)->isAggregatePartitioned()) { + // The same AggregationState is shared across all the WorkOrders. + for (std::size_t part_id = 0; + part_id < query_context->getAggregationState(aggr_state_index_) + ->getNumPartitions(); + ++part_id) { + container->addNormalWorkOrder( + new FinalizeAggregationWorkOrder( + query_id_, + query_context->getAggregationState(aggr_state_index_), + query_context->getInsertDestination(output_destination_index_), + static_cast<int>(part_id)), + op_index_); + } + } else { + container->addNormalWorkOrder( + new FinalizeAggregationWorkOrder( + query_id_, + query_context->getAggregationState(aggr_state_index_), + query_context->getInsertDestination(output_destination_index_)), + op_index_); + } } return started_; } @@ -70,7 +87,11 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer void FinalizeAggregationWorkOrder::execute() { - state_->finalizeAggregate(output_destination_); + if (state_->isAggregatePartitioned()) { + state_->finalizeAggregatePartitioned(part_id_, output_destination_); + } else { + state_->finalizeAggregate(output_destination_); + } } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc524a97/relational_operators/FinalizeAggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp index 0aeac2a..7517d58 100644 --- a/relational_operators/FinalizeAggregationOperator.hpp +++ b/relational_operators/FinalizeAggregationOperator.hpp @@ -119,13 +119,18 @@ class FinalizeAggregationWorkOrder : public WorkOrder { * @param state The AggregationState to use. * @param output_destination The InsertDestination to insert aggregation * results. + * @param part_id The partition ID for which the Finalize aggregation work + * order is issued. Ignore this field if the aggregation is not + * partitioned. */ FinalizeAggregationWorkOrder(const std::size_t query_id, AggregationOperationState *state, - InsertDestination *output_destination) + InsertDestination *output_destination, + int part_id = -1) : WorkOrder(query_id), state_(DCHECK_NOTNULL(state)), - output_destination_(DCHECK_NOTNULL(output_destination)) {} + output_destination_(DCHECK_NOTNULL(output_destination)), + part_id_(part_id) {} ~FinalizeAggregationWorkOrder() override {} @@ -134,6 +139,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder { private: AggregationOperationState *state_; InsertDestination *output_destination_; + const int part_id_; DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder); };