Repository: incubator-quickstep
Updated Branches:
  refs/heads/partitioned-aggregation 63f66a242 -> be43958fc (forced update)


Introduced DestroyAggregationState operator

- Similar to the pattern with DestroyHash, this operator destroys the
  AggregationState once the Finalize aggregation operator finishes its
  execution.
- Optimizer support for DestroyAggregationState operator.
- Removed unused QueryContext::releaseAggregationState method.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/590ba4da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/590ba4da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/590ba4da

Branch: refs/heads/partitioned-aggregation
Commit: 590ba4dacfe0c65a1d254e79daaecbf4f1f5854b
Parents: 1d10422
Author: Harshad Deshmukh <hbdeshm...@apache.org>
Authored: Tue Aug 23 11:00:57 2016 -0500
Committer: Harshad Deshmukh <hbdeshm...@apache.org>
Committed: Tue Sep 6 10:21:39 2016 -0500

----------------------------------------------------------------------
 query_execution/QueryContext.hpp                |  10 +-
 query_optimizer/CMakeLists.txt                  |   1 +
 query_optimizer/ExecutionGenerator.cpp          |  10 ++
 relational_operators/CMakeLists.txt             |  16 +++
 .../DestroyAggregationStateOperator.cpp         |  64 ++++++++++
 .../DestroyAggregationStateOperator.hpp         | 120 +++++++++++++++++++
 .../FinalizeAggregationOperator.cpp             |   2 +-
 .../FinalizeAggregationOperator.hpp             |   3 +-
 relational_operators/WorkOrder.proto            |   7 ++
 relational_operators/WorkOrderFactory.cpp       |  16 ++-
 .../tests/AggregationOperator_unittest.cpp      |  25 ++++
 11 files changed, 264 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index c54c7ff..393b55e 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -182,16 +182,14 @@ class QueryContext {
   }
 
   /**
-   * @brief Release the given AggregationOperationState.
+   * @brief Destroy the given aggregation state.
    *
-   * @param id The id of the AggregationOperationState to destroy.
-   *
-   * @return The AggregationOperationState, alreadly created in the 
constructor.
+   * @param id The ID of the AggregationOperationState to destroy.
    **/
-  inline AggregationOperationState* releaseAggregationState(const 
aggregation_state_id id) {
+  inline void destroyAggregationState(const aggregation_state_id id) {
     DCHECK_LT(id, aggregation_states_.size());
     DCHECK(aggregation_states_[id]);
-    return aggregation_states_[id].release();
+    aggregation_states_[id].reset(nullptr);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 56ae52f..32f7885 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -118,6 +118,7 @@ 
target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_relationaloperators_CreateIndexOperator
                       quickstep_relationaloperators_CreateTableOperator
                       quickstep_relationaloperators_DeleteOperator
+                      
quickstep_relationaloperators_DestroyAggregationStateOperator
                       quickstep_relationaloperators_DestroyHashOperator
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index 2e03e09..130134c 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -94,6 +94,7 @@
 #include "relational_operators/CreateIndexOperator.hpp"
 #include "relational_operators/CreateTableOperator.hpp"
 #include "relational_operators/DeleteOperator.hpp"
+#include "relational_operators/DestroyAggregationStateOperator.hpp"
 #include "relational_operators/DestroyHashOperator.hpp"
 #include "relational_operators/DropTableOperator.hpp"
 #include "relational_operators/FinalizeAggregationOperator.hpp"
@@ -1464,6 +1465,15 @@ void ExecutionGenerator::convertAggregate(
       std::forward_as_tuple(finalize_aggregation_operator_index, 
output_relation));
   
temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
                                             output_relation);
+
+  const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
+      execution_plan_->addRelationalOperator(
+          new DestroyAggregationStateOperator(query_handle_->query_id(),
+                                              aggr_state_index));
+
+  
execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
+                                       finalize_aggregation_operator_index,
+                                       true);
 }
 
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt 
b/relational_operators/CMakeLists.txt
index 43a42f9..cdfe309 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -33,6 +33,9 @@ add_library(quickstep_relationaloperators_AggregationOperator 
AggregationOperato
 add_library(quickstep_relationaloperators_BuildHashOperator 
BuildHashOperator.cpp BuildHashOperator.hpp)
 add_library(quickstep_relationaloperators_CreateIndexOperator 
CreateIndexOperator.cpp CreateIndexOperator.hpp)
 add_library(quickstep_relationaloperators_CreateTableOperator 
CreateTableOperator.cpp CreateTableOperator.hpp)
+add_library(quickstep_relationaloperators_DestroyAggregationStateOperator 
+            DestroyAggregationStateOperator.cpp 
+            DestroyAggregationStateOperator.hpp)
 add_library(quickstep_relationaloperators_DeleteOperator DeleteOperator.cpp 
DeleteOperator.hpp)
 add_library(quickstep_relationaloperators_DestroyHashOperator 
DestroyHashOperator.cpp DestroyHashOperator.hpp)
 add_library(quickstep_relationaloperators_DropTableOperator 
DropTableOperator.cpp DropTableOperator.hpp)
@@ -136,6 +139,16 @@ 
target_link_libraries(quickstep_relationaloperators_DeleteOperator
                       quickstep_threading_ThreadIDBasedMap
                       quickstep_utility_Macros
                       tmb)
+target_link_libraries(quickstep_relationaloperators_DestroyAggregationStateOperator
+                      glog
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_utility_Macros
+                      tmb)
 target_link_libraries(quickstep_relationaloperators_DestroyHashOperator
                       glog
                       quickstep_queryexecution_QueryContext
@@ -451,6 +464,7 @@ 
target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_relationaloperators_AggregationOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_DeleteOperator
+                      
quickstep_relationaloperators_DestroyAggregationStateOperator
                       quickstep_relationaloperators_DestroyHashOperator
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
@@ -484,6 +498,7 @@ target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_CreateIndexOperator
                       quickstep_relationaloperators_CreateTableOperator
                       quickstep_relationaloperators_DeleteOperator
+                      
quickstep_relationaloperators_DestroyAggregationStateOperator
                       quickstep_relationaloperators_DestroyHashOperator
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
@@ -533,6 +548,7 @@ target_link_libraries(AggregationOperator_unittest
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_WorkOrdersContainer
                       quickstep_relationaloperators_AggregationOperator
+                      
quickstep_relationaloperators_DestroyAggregationStateOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_storage_AggregationOperationState_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp 
b/relational_operators/DestroyAggregationStateOperator.cpp
new file mode 100644
index 0000000..62ca9e7
--- /dev/null
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/DestroyAggregationStateOperator.hpp"
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool DestroyAggregationStateOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (blocking_dependencies_met_ && !work_generated_) {
+    work_generated_ = true;
+    container->addNormalWorkOrder(
+        new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, 
query_context),
+        op_index_);
+  }
+  return work_generated_;
+}
+
+bool 
DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer 
*container) {
+  if (blocking_dependencies_met_ && !work_generated_) {
+    work_generated_ = true;
+
+    serialization::WorkOrder *proto = new serialization::WorkOrder;
+    proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE);
+    proto->set_query_id(query_id_);
+    
proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index,
 aggr_state_index_);
+
+    container->addWorkOrderProto(proto, op_index_);
+  }
+  return work_generated_;
+}
+
+void DestroyAggregationStateWorkOrder::execute() {
+  query_context_->destroyAggregationState(aggr_state_index_);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/DestroyAggregationStateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.hpp 
b/relational_operators/DestroyAggregationStateOperator.hpp
new file mode 100644
index 0000000..bfb5ff1
--- /dev/null
+++ b/relational_operators/DestroyAggregationStateOperator.hpp
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_
+
+#include <string>
+
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+/**
+ * @brief An operator which destroys a shared aggregation state.
+ **/
+class DestroyAggregationStateOperator : public RelationalOperator {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_id The ID of the query to which this operator belongs.
+   * @param aggr_state_index The index of the AggregationState in QueryContext.
+   **/
+  DestroyAggregationStateOperator(
+      const std::size_t query_id,
+      const QueryContext::aggregation_state_id aggr_state_index)
+      : RelationalOperator(query_id),
+        aggr_state_index_(aggr_state_index),
+        work_generated_(false) {}
+
+  ~DestroyAggregationStateOperator() override {}
+
+  std::string getName() const override {
+    return "DestroyAggregationStateOperator";
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ private:
+  const QueryContext::aggregation_state_id aggr_state_index_;
+  bool work_generated_;
+
+  DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by DestroyAggregationStateOperator.
+ **/
+class DestroyAggregationStateWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_id The ID of the query to which this WorkOrder belongs.
+   * @param aggr_state_index The index of the AggregationState in QueryContext.
+   * @param query_context The QueryContext to use.
+   **/
+  DestroyAggregationStateWorkOrder(
+      const std::size_t query_id,
+      const QueryContext::aggregation_state_id aggr_state_index,
+      QueryContext *query_context)
+      : WorkOrder(query_id),
+        aggr_state_index_(aggr_state_index),
+        query_context_(DCHECK_NOTNULL(query_context)) {}
+
+  ~DestroyAggregationStateWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  const QueryContext::aggregation_state_id aggr_state_index_;
+  QueryContext *query_context_;
+
+  DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // 
QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp 
b/relational_operators/FinalizeAggregationOperator.cpp
index 65e62c4..7e337de 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -44,7 +44,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
     container->addNormalWorkOrder(
         new FinalizeAggregationWorkOrder(
             query_id_,
-            query_context->releaseAggregationState(aggr_state_index_),
+            query_context->getAggregationState(aggr_state_index_),
             query_context->getInsertDestination(output_destination_index_)),
         op_index_);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp 
b/relational_operators/FinalizeAggregationOperator.hpp
index 7ac6712..0aeac2a 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -22,7 +22,6 @@
 
 #include <cstddef>
 #include <string>
-#include <memory>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
@@ -133,7 +132,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
   void execute() override;
 
  private:
-  std::unique_ptr<AggregationOperationState> state_;
+  AggregationOperationState *state_;
   InsertDestination *output_destination_;
 
   DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto 
b/relational_operators/WorkOrder.proto
index 02aa50e..3eed379 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -42,6 +42,7 @@ enum WorkOrderType {
   TEXT_SCAN = 18;
   UPDATE = 19;
   WINDOW_AGGREGATION = 20;
+  DESTROY_AGGREGATION_STATE = 21;
 }
 
 message WorkOrder {
@@ -253,3 +254,9 @@ message WindowAggregationWorkOrder {
     optional int32 insert_destination_index = 338;
   }
 }
+
+message DestroyAggregationStateWorkOrder {
+  extend WorkOrder {
+    optional uint32 aggr_state_index = 339;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp 
b/relational_operators/WorkOrderFactory.cpp
index 6970486..2356bab 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -30,6 +30,7 @@
 #include "relational_operators/AggregationOperator.hpp"
 #include "relational_operators/BuildHashOperator.hpp"
 #include "relational_operators/DeleteOperator.hpp"
+#include "relational_operators/DestroyAggregationStateOperator.hpp"
 #include "relational_operators/DestroyHashOperator.hpp"
 #include "relational_operators/DropTableOperator.hpp"
 #include "relational_operators/FinalizeAggregationOperator.hpp"
@@ -116,6 +117,14 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const 
serialization::WorkOrder
           shiftboss_client_id,
           bus);
     }
+    case serialization::DESTROY_AGGREGATION_STATE: {
+      LOG(INFO) << "Creating DestroyAggregationStateWorkOrder";
+      return new DestroyAggregationStateWorkOrder(
+          proto.query_id(),
+          proto.GetExtension(
+              
serialization::DestroyAggregationStateWorkOrder::aggr_state_index),
+          query_context);
+    }
     case serialization::DESTROY_HASH: {
       LOG(INFO) << "Creating DestroyHashWorkOrder";
       return new DestroyHashWorkOrder(
@@ -145,7 +154,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const 
serialization::WorkOrder
       LOG(INFO) << "Creating FinalizeAggregationWorkOrder";
       return new FinalizeAggregationWorkOrder(
           proto.query_id(),
-          query_context->releaseAggregationState(proto.GetExtension(
+          query_context->getAggregationState(proto.GetExtension(
               serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::FinalizeAggregationWorkOrder::
@@ -489,6 +498,11 @@ bool WorkOrderFactory::ProtoIsValid(const 
serialization::WorkOrder &proto,
              proto.HasExtension(serialization::DeleteWorkOrder::block_id) &&
              
proto.HasExtension(serialization::DeleteWorkOrder::operator_index);
     }
+    case serialization::DESTROY_AGGREGATION_STATE: {
+      return 
proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index)
 &&
+             query_context.isValidAggregationStateId(
+                 
proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index));
+    }
     case serialization::DESTROY_HASH: {
       return 
proto.HasExtension(serialization::DestroyHashWorkOrder::join_hash_table_index) 
&&
              query_context.isValidJoinHashTableId(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp 
b/relational_operators/tests/AggregationOperator_unittest.cpp
index 7a5b461..0138362 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -44,6 +44,7 @@
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/DestroyAggregationStateOperator.hpp"
 #include "relational_operators/FinalizeAggregationOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/AggregationOperationState.pb.h"
@@ -292,6 +293,9 @@ class AggregationOperatorTest : public ::testing::Test {
                                         *result_table_,
                                         insert_destination_index));
 
+    destroy_aggr_state_op_.reset(
+        new DestroyAggregationStateOperator(kQueryId, aggr_state_index));
+
     // Set up the QueryContext.
     query_context_.reset(new QueryContext(query_context_proto,
                                           *db_,
@@ -304,6 +308,7 @@ class AggregationOperatorTest : public ::testing::Test {
     // class' checks about operator index are successful.
     op_->setOperatorIndex(kOpIndex);
     finalize_op_->setOperatorIndex(kOpIndex);
+    destroy_aggr_state_op_->setOperatorIndex(kOpIndex);
   }
 
   void setupTestGroupBy(const std::string &stem,
@@ -379,6 +384,9 @@ class AggregationOperatorTest : public ::testing::Test {
                                         *result_table_,
                                         insert_destination_index));
 
+    destroy_aggr_state_op_.reset(
+        new DestroyAggregationStateOperator(kQueryId, aggr_state_index));
+
     // Set up the QueryContext.
     query_context_.reset(new QueryContext(query_context_proto,
                                           *db_,
@@ -391,6 +399,7 @@ class AggregationOperatorTest : public ::testing::Test {
     // class' checks about operator index are successful.
     op_->setOperatorIndex(kOpIndex);
     finalize_op_->setOperatorIndex(kOpIndex);
+    destroy_aggr_state_op_->setOperatorIndex(kOpIndex);
   }
 
   void execute() {
@@ -423,6 +432,21 @@ class AggregationOperatorTest : public ::testing::Test {
       work_order->execute();
       delete work_order;
     }
+
+    destroy_aggr_state_op_->informAllBlockingDependenciesMet();
+
+    WorkOrdersContainer destroy_aggr_state_op_container(1, 0);
+    const std::size_t destroy_aggr_state_op_index = 0;
+    destroy_aggr_state_op_->getAllWorkOrders(&destroy_aggr_state_op_container,
+                                             query_context_.get(),
+                                             storage_manager_.get(),
+                                             foreman_client_id_,
+                                             &bus_);
+    while 
(destroy_aggr_state_op_container.hasNormalWorkOrder(destroy_aggr_state_op_index))
 {
+      WorkOrder *work_order = 
destroy_aggr_state_op_container.getNormalWorkOrder(destroy_aggr_state_op_index);
+      work_order->execute();
+      delete work_order;
+    }
   }
 
   template <class T>
@@ -528,6 +552,7 @@ class AggregationOperatorTest : public ::testing::Test {
 
   std::unique_ptr<AggregationOperator> op_;
   std::unique_ptr<FinalizeAggregationOperator> finalize_op_;
+  std::unique_ptr<DestroyAggregationStateOperator> destroy_aggr_state_op_;
 };
 
 const char AggregationOperatorTest::kDatabaseName[] = "database";

Reply via email to