http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp 
b/storage/AggregationOperationState.hpp
index cbbfc22..9fa3bd2 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -34,6 +34,7 @@
 #include "storage/HashTablePool.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
+#include "utility/ScopedBuffer.hpp"
 
 namespace quickstep {
 
@@ -167,8 +168,8 @@ class AggregationOperationState {
    **/
   void finalizeAggregate(InsertDestination *output_destination);
 
-  static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
-                                     AggregationStateHashTableBase *dst);
+  static void mergeGroupByHashTables(AggregationStateHashTableBase 
*destination_hash_table,
+                                     const AggregationStateHashTableBase 
*source_hash_table);
 
   int dflag;
 
@@ -176,7 +177,7 @@ class AggregationOperationState {
   // Merge locally (per storage block) aggregated states with global 
aggregation
   // states.
   void mergeSingleState(
-      const std::vector<std::unique_ptr<AggregationState>> &local_state);
+      const std::vector<ScopedBuffer> &local_state);
 
   // Aggregate on input block.
   void aggregateBlockSingleState(const block_id input_block);
@@ -201,10 +202,6 @@ class AggregationOperationState {
   // arguments.
   std::vector<bool> is_distinct_;
 
-  // Hash table for obtaining distinct (i.e. unique) arguments.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
-      distinctify_hashtables_;
-
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   // If all an aggregate's argument expressions are simply attributes in
   // 'input_relation_', then this caches the attribute IDs of those arguments.
@@ -212,14 +209,7 @@ class AggregationOperationState {
 #endif
 
   // Per-aggregate global states for aggregation without GROUP BY.
-  std::vector<std::unique_ptr<AggregationState>> single_states_;
-
-  // Per-aggregate HashTables for aggregation with GROUP BY.
-  //
-  // TODO(shoban): We should ideally store the aggregation state together in 
one
-  // hash table to prevent multiple lookups.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
-      group_by_hashtables_;
+  std::vector<ScopedBuffer> single_states_;
 
   // A vector of group by hash table pools.
   std::unique_ptr<HashTablePool> group_by_hashtable_pool_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationResultIterator.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationResultIterator.hpp 
b/storage/AggregationResultIterator.hpp
new file mode 100644
index 0000000..259c533
--- /dev/null
+++ b/storage/AggregationResultIterator.hpp
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_AGGREGATION_RESULT_ITERATOR_HPP_
+#define QUICKSTEP_STORAGE_AGGREGATION_RESULT_ITERATOR_HPP_
+
+#include <cstddef>
+#include <vector>
+
+#include "storage/AggregationStateManager.hpp"
+#include "storage/HashTableUntypedKeyManager.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+class AggregationResultIterator {
+ public:
+  AggregationResultIterator(const void *buckets,
+                            const std::size_t bucket_size,
+                            const std::size_t num_entries,
+                            const HashTableUntypedKeyManager &key_manager,
+                            const AggregationStateManager<false> 
&state_manager)
+      : buckets_(buckets),
+        bucket_size_(bucket_size),
+        num_entries_(num_entries),
+        key_manager_(key_manager),
+        state_manager_(state_manager) {}
+
+  inline std::size_t getKeySize() const {
+    return key_manager_.getFixedKeySize();
+  }
+
+  inline std::size_t getResultsSize() const {
+    return state_manager_.getResultsSizeInBytes();
+  }
+
+  inline void beginIteration() {
+    current_position_ = std::numeric_limits<std::size_t>::max();
+  }
+
+  inline bool iterationFinished() const {
+    return current_position_ + 1 >= num_entries_;
+  }
+
+  inline bool next() {
+    ++current_position_;
+    return current_position_ < num_entries_;
+  }
+
+  inline void previous() {
+    --current_position_;
+  }
+
+  inline void writeKeyTo(void *destination) const {
+    key_manager_.copyUntypedKey(
+        destination,
+        key_manager_.getUntypedKeyComponent(getCurrentBucket()));
+  }
+
+  inline void writeResultsTo(void *destination) const {
+    state_manager_.finalizeStates(destination, getCurrentBucket());
+  }
+
+ private:
+  inline const void* getCurrentBucket() const {
+    return static_cast<const char *>(buckets_) + current_position_ * 
bucket_size_;
+  }
+
+  friend class ThreadPrivateAggregationStateHashTable;
+
+  std::size_t current_position_;
+
+  const void *buckets_;
+  const std::size_t bucket_size_;
+  const std::size_t num_entries_;
+  const HashTableUntypedKeyManager &key_manager_;
+  const AggregationStateManager<false> &state_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(AggregationResultIterator);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_AGGREGATION_RESULT_ITERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationStateHashTable.hpp 
b/storage/AggregationStateHashTable.hpp
new file mode 100644
index 0000000..85a6bdc
--- /dev/null
+++ b/storage/AggregationStateHashTable.hpp
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_AGGREGATION_STATE_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_AGGREGATION_STATE_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdlib>
+#include <cstring>
+#include <limits>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/AggregationResultIterator.hpp"
+#include "storage/AggregationStateManager.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/HashTableUntypedKeyManager.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "threading/SpinMutex.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFunctors.hpp"
+#include "utility/Alignment.hpp"
+#include "utility/InlineMemcpy.hpp"
+#include "utility/Macros.hpp"
+#include "utility/PrimeNumber.hpp"
+#include "utility/ScopedBuffer.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+class ThreadPrivateAggregationStateHashTable : public 
AggregationStateHashTableBase {
+ public:
+  ThreadPrivateAggregationStateHashTable(const std::vector<const Type *> 
&key_types,
+                                         const std::size_t num_entries,
+                                         const std::vector<AggregationHandle 
*> &handles,
+                                         StorageManager *storage_manager)
+    : payload_manager_(handles),
+      key_types_(key_types),
+      key_manager_(this->key_types_, payload_manager_.getStatesSizeInBytes()),
+      slots_(num_entries * kHashTableLoadFactor,
+             key_manager_.getUntypedKeyHashFunctor(),
+             key_manager_.getUntypedKeyEqualityFunctor()),
+      bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize(),
+                                     payload_manager_.getStatesSizeInBytes())),
+      buckets_allocated_(0),
+      storage_manager_(storage_manager) {
+    std::size_t num_storage_slots =
+        this->storage_manager_->SlotsNeededForBytes(num_entries);
+
+    // Get a StorageBlob to hold the hash table.
+    const block_id blob_id = 
this->storage_manager_->createBlob(num_storage_slots);
+    this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
+
+    buckets_ = this->blob_->getMemoryMutable();
+    num_buckets_ = num_storage_slots * kSlotSizeBytes / bucket_size_;
+  }
+
+  ~ThreadPrivateAggregationStateHashTable() {}
+
+  inline std::size_t numEntries() const {
+    return buckets_allocated_;
+  }
+
+  inline std::size_t getKeySizeInBytes() const {
+    return key_manager_.getFixedKeySize();
+  }
+
+  inline std::size_t getStatesSizeInBytes() const {
+    return payload_manager_.getStatesSizeInBytes();
+  }
+
+  inline std::size_t getResultsSizeInBytes() const {
+    return payload_manager_.getResultsSizeInBytes();
+  }
+
+  AggregationResultIterator* createResultIterator() const override {
+    return new AggregationResultIterator(buckets_,
+                                         bucket_size_,
+                                         buckets_allocated_,
+                                         key_manager_,
+                                         payload_manager_);
+  }
+
+  bool upsertValueAccessor(ValueAccessor *accessor,
+                           const attribute_id key_attr_id,
+                           const std::vector<attribute_id> &argument_ids) 
override {
+    if (key_manager_.isKeyNullable()) {
+      return upsertValueAccessorInternal<true>(
+          accessor, key_attr_id, argument_ids);
+    } else {
+      return upsertValueAccessorInternal<false>(
+          accessor, key_attr_id, argument_ids);
+    }
+  }
+
+  template <bool check_for_null_keys>
+  bool upsertValueAccessorInternal(ValueAccessor *accessor,
+                                   const attribute_id key_attr_id,
+                                   const std::vector<attribute_id> 
&argument_ids) {
+    return InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
+      accessor->beginIteration();
+      while (accessor->next()) {
+        const void *key = accessor->template 
getUntypedValue<check_for_null_keys>(key_attr_id);
+        if (check_for_null_keys && key == nullptr) {
+          continue;
+        }
+        bool is_empty;
+        void *bucket = locateBucket(key, &is_empty);
+        if (is_empty) {
+          payload_manager_.initializeStates(bucket);
+        } else {
+          payload_manager_.template updateStates<check_for_null_keys>(
+              bucket, accessor, argument_ids);
+        }
+      }
+      return true;
+    });
+  }
+
+  bool upsertValueAccessorCompositeKey(ValueAccessor *accessor,
+                                       const std::vector<attribute_id> 
&key_attr_ids,
+                                       const std::vector<attribute_id> 
&argument_ids) override {
+    if (key_manager_.isKeyNullable()) {
+      return upsertValueAccessorCompositeKeyInternal<true>(
+          accessor, key_attr_ids, argument_ids);
+    } else {
+      return upsertValueAccessorCompositeKeyInternal<false>(
+          accessor, key_attr_ids, argument_ids);
+    }
+  }
+
+  template <bool check_for_null_keys>
+  bool upsertValueAccessorCompositeKeyInternal(ValueAccessor *accessor,
+                                               const std::vector<attribute_id> 
&key_attr_ids,
+                                               const std::vector<attribute_id> 
&argument_ids) {
+    return InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
+      accessor->beginIteration();
+      void *prealloc_bucket = allocateBucket();
+      while (accessor->next()) {
+        if (check_for_null_keys) {
+          const bool is_null =
+              key_manager_.writeNullableUntypedKeyFromValueAccessorToBucket(
+                  accessor,
+                  key_attr_ids,
+                  prealloc_bucket);
+          if (is_null) {
+            continue;
+          }
+        } else {
+          key_manager_.writeUntypedKeyFromValueAccessorToBucket(
+              accessor,
+              key_attr_ids,
+              prealloc_bucket);
+        }
+        void *bucket = locateBucketWithPrealloc(prealloc_bucket);
+        if (bucket == prealloc_bucket) {
+          payload_manager_.initializeStates(bucket);
+          prealloc_bucket = allocateBucket();
+        } else {
+          payload_manager_.template updateStates<check_for_null_keys>(
+              bucket, accessor, argument_ids);
+        }
+      }
+      // Reclaim the last unused bucket
+      --buckets_allocated_;
+      return true;
+    });
+  }
+
+  void mergeHashTable(const ThreadPrivateAggregationStateHashTable 
*source_hash_table) {
+    source_hash_table->forEachKeyAndStates(
+        [&](const void *source_key, const void *source_states) -> void {
+          bool is_empty;
+          void *bucket = locateBucket(source_key, &is_empty);
+          if (is_empty) {
+            payload_manager_.copyStates(bucket, source_states);
+          } else {
+            payload_manager_.mergeStates(bucket, source_states);
+          }
+        });
+  }
+
+  template <typename FunctorT>
+  inline void forEachKey(const FunctorT &functor) const {
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      functor(key_manager_.getUntypedKeyComponent(locateBucket(i)));
+    }
+  }
+
+  template <typename FunctorT>
+  inline void forEachKeyAndStates(const FunctorT &functor) const {
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      const char *bucket = static_cast<const char *>(locateBucket(i));
+      functor(key_manager_.getUntypedKeyComponent(bucket), bucket);
+    }
+  }
+
+  inline void* locateBucket(const std::size_t bucket_id) const {
+    return static_cast<char *>(buckets_) + bucket_id * bucket_size_;
+  }
+
+  inline void* locateBucket(const void *key, bool *is_empty) {
+    auto slot_it = slots_.find(key);
+    if (slot_it == slots_.end()) {
+      void *bucket = allocateBucket();
+      key_manager_.writeUntypedKeyToBucket(key, bucket);
+      slots_.emplace(key_manager_.getUntypedKeyComponent(bucket), bucket);
+      *is_empty = true;
+      return bucket;
+    } else {
+      *is_empty = false;
+      return slot_it->second;
+    }
+  }
+
+  inline void* locateBucketWithPrealloc(void *prealloc_bucket) {
+    const void *key = key_manager_.getUntypedKeyComponent(prealloc_bucket);
+    auto slot_it = slots_.find(key);
+    if (slot_it == slots_.end()) {
+      slots_.emplace(key, prealloc_bucket);
+      return prealloc_bucket;
+    } else {
+      return slot_it->second;
+    }
+  }
+
+  inline void* allocateBucket() {
+    if (buckets_allocated_ >= num_buckets_) {
+      resize();
+    }
+    void *bucket = locateBucket(buckets_allocated_);
+    ++buckets_allocated_;
+    return bucket;
+  }
+
+  void resize() {
+    const std::size_t resized_memory_required = num_buckets_ * bucket_size_ * 
2;
+    const std::size_t resized_storage_slots =
+        this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
+    const block_id resized_blob_id =
+        this->storage_manager_->createBlob(resized_storage_slots);
+    MutableBlobReference resized_blob =
+        this->storage_manager_->getBlobMutable(resized_blob_id);
+
+    void *resized_buckets = resized_blob->getMemoryMutable();
+    std::memcpy(resized_buckets, buckets_, buckets_allocated_ * bucket_size_);
+
+    for (auto &pair : slots_) {
+      pair.second =
+           (static_cast<const char *>(pair.first) - static_cast<char 
*>(buckets_))
+           + static_cast<char *>(resized_buckets);
+    }
+
+    buckets_ = resized_buckets;
+    num_buckets_ = resized_storage_slots * kSlotSizeBytes / bucket_size_;
+    std::swap(this->blob_, resized_blob);
+  }
+
+  void print() const override {
+    std::cerr << "Bucket size = " << bucket_size_ << "\n";
+    std::cerr << "Buckets: \n";
+    for (const auto &pair : slots_) {
+      std::cerr << pair.first << " -- " << pair.second << "\n";
+      std::cerr << *static_cast<const int *>(pair.second) << "\n";
+    }
+  }
+
+ private:
+  // Helper object to manage hash table payloads (i.e. aggregation states).
+  AggregationStateManager<false> payload_manager_;
+
+  // Type(s) of keys.
+  const std::vector<const Type*> key_types_;
+
+  // Helper object to manage key storage.
+  HashTableUntypedKeyManager key_manager_;
+
+  // Round bucket size up to a multiple of kBucketAlignment.
+  static std::size_t ComputeBucketSize(const std::size_t fixed_key_size,
+                                       const std::size_t total_payload_size) {
+    constexpr std::size_t kBucketAlignment = 4;
+    return (((fixed_key_size + total_payload_size - 1)
+               / kBucketAlignment) + 1) * kBucketAlignment;
+  }
+
+  std::unordered_map<const void *, void *,
+                     UntypedHashFunctor,
+                     UntypedEqualityFunctor> slots_;
+
+  void *buckets_;
+  const std::size_t bucket_size_;
+  std::size_t num_buckets_;
+  std::size_t buckets_allocated_;
+
+  StorageManager *storage_manager_;
+  MutableBlobReference blob_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPrivateAggregationStateHashTable);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_AGGREGATION_STATE_HASH_TABLE_HPP_
+

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationStateManager.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationStateManager.hpp 
b/storage/AggregationStateManager.hpp
new file mode 100644
index 0000000..98dca90
--- /dev/null
+++ b/storage/AggregationStateManager.hpp
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_AGGREGATION_STATE_MANAGER_HPP_
+#define QUICKSTEP_STORAGE_AGGREGATION_STATE_MANAGER_HPP_
+
+#include <cstddef>
+#include <cstring>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "threading/SpinMutex.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "utility/InlineMemcpy.hpp"
+#include "utility/Macros.hpp"
+#include "utility/ScopedBuffer.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+template <bool use_mutex>
+class AggregationStateManager {
+ public:
+  AggregationStateManager(const std::vector<AggregationHandle *> &handles)
+      : handles_(handles),
+        states_size_in_bytes_(0),
+        results_size_in_bytes_(0) {
+    if (use_mutex) {
+      states_size_in_bytes_ += sizeof(SpinMutex);
+    }
+    for (const AggregationHandle *handle : handles) {
+      const std::size_t state_size = handle->getStateSize();
+      state_sizes_.emplace_back(state_size);
+      state_offsets_.emplace_back(states_size_in_bytes_);
+      states_size_in_bytes_ += state_size;
+
+      const std::size_t result_size = handle->getResultSize();
+      result_sizes_.emplace_back(result_size);
+      result_offsets_.emplace_back(results_size_in_bytes_);
+      results_size_in_bytes_ += result_size;
+
+      accumulate_functors_.emplace_back(handle->getStateAccumulateFunctor());
+      merge_functors_.emplace_back(handle->getStateMergeFunctor());
+      finalize_functors_.emplace_back(handle->getStateFinalizeFunctor());
+    }
+
+    initial_states_.reset(states_size_in_bytes_, false);
+    if (use_mutex) {
+      new(initial_states_.get()) Mutex;
+    }
+    for (std::size_t i = 0; i < handles_.size(); ++i) {
+      handles_[i]->initializeState(
+          static_cast<char *>(initial_states_.get()) + state_offsets_[i]);
+    }
+  }
+
+  inline std::size_t getStatesSizeInBytes() const {
+    return states_size_in_bytes_;
+  }
+
+  inline std::size_t getResultsSizeInBytes() const {
+    return results_size_in_bytes_;
+  }
+
+  inline void initializeStates(void *states) const {
+    copyStates(states, initial_states_.get());
+  }
+
+  template <bool check_for_null_keys, typename ValueAccessorT>
+  inline void updateState(void *states,
+                          ValueAccessorT *accessor,
+                          const attribute_id argument_id) const {
+    // TODO: templates on whether to check invalid attribute id
+    DCHECK_NE(argument_id, kInvalidAttributeID);
+
+    const void *value =
+        accessor->template getUntypedValue<check_for_null_keys>(argument_id);
+    if (check_for_null_keys && value == nullptr) {
+      return;
+    }
+    accumulate_functors_.front()(states, value);
+  }
+
+  template <bool check_for_null_keys, typename ValueAccessorT>
+  inline void updateStates(void *states,
+                           ValueAccessorT *accessor,
+                           const std::vector<attribute_id> &argument_ids) 
const {
+    for (std::size_t i = 0; i < argument_ids.size(); ++i) {
+      // TODO: templates on whether to check invalid attribute id
+      DCHECK_NE(argument_ids[i], kInvalidAttributeID);
+
+      const void *value =
+          accessor->template 
getUntypedValue<check_for_null_keys>(argument_ids[i]);
+      if (check_for_null_keys && value == nullptr) {
+        return;
+      }
+      accumulate_functors_[i](getStateComponent(states, i), value);
+    }
+  }
+
+  inline void copyStates(void *destination_states,
+                         const void *source_states) const {
+    InlineMemcpy(destination_states, source_states, states_size_in_bytes_);
+  }
+
+  inline void mergeStates(void *destination_states,
+                          const void *source_states) const {
+    for (std::size_t i = 0; i < merge_functors_.size(); ++i) {
+      merge_functors_[i](getStateComponent(destination_states, i),
+                         getStateComponent(source_states, i));
+    }
+  }
+
+  inline void finalizeStates(void *results, const void *states) const {
+    for (std::size_t i = 0; i < merge_functors_.size(); ++i) {
+      finalize_functors_[i](getResultComponent(results, i),
+                            getStateComponent(states, i));
+    }
+  }
+
+  inline const void* getStateComponent(const void *states,
+                                       const std::size_t component_id) const {
+    return static_cast<const char *>(states) + state_offsets_[component_id];
+  }
+
+  inline void* getStateComponent(void *states,
+                                 const std::size_t component_id) const {
+    return static_cast<char *>(states) + state_offsets_[component_id];
+  }
+
+  inline void* getResultComponent(void *results,
+                                  const std::size_t component_id) const {
+    return static_cast<char *>(results) + result_offsets_[component_id];
+  }
+
+ private:
+  std::vector<AggregationHandle *> handles_;
+
+  std::vector<std::size_t> state_sizes_;
+  std::vector<std::size_t> state_offsets_;
+  std::size_t states_size_in_bytes_;
+
+  std::vector<std::size_t> result_sizes_;
+  std::vector<std::size_t> result_offsets_;
+  std::size_t results_size_in_bytes_;
+
+  std::vector<AggregationStateAccumulateFunctor> accumulate_functors_;
+  std::vector<AggregationStateMergeFunctor> merge_functors_;
+  std::vector<AggregationStateFinalizeFunctor> finalize_functors_;
+
+  ScopedBuffer initial_states_;
+
+  DISALLOW_COPY_AND_ASSIGN(AggregationStateManager);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_AGGREGATION_STATE_MANAGER_HPP_
+

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index bdc7596..0aaaca4 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -145,11 +145,13 @@ if (ENABLE_DISTRIBUTED)
 endif()
 
 # Declare micro-libs:
-add_library(quickstep_storage_AggregationHashTable ../empty_src.cpp 
AggregationHashTable.hpp)
+add_library(quickstep_storage_AggregationStateHashTable ../empty_src.cpp 
AggregationStateHashTable.hpp)
+add_library(quickstep_storage_AggregationStateManager ../empty_src.cpp 
AggregationStateManager.hpp)
 add_library(quickstep_storage_AggregationOperationState
             AggregationOperationState.cpp
             AggregationOperationState.hpp)
 add_library(quickstep_storage_AggregationOperationState_proto 
${storage_AggregationOperationState_proto_srcs})
+add_library(quickstep_storage_AggregationResultIterator ../empty_src.cpp 
AggregationResultIterator.hpp)
 add_library(quickstep_storage_BasicColumnStoreTupleStorageSubBlock
             BasicColumnStoreTupleStorageSubBlock.cpp
             BasicColumnStoreTupleStorageSubBlock.hpp)
@@ -199,9 +201,6 @@ if (ENABLE_DISTRIBUTED)
 endif()
 
 add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp 
EvictionPolicy.hpp)
-add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
-add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp 
FastHashTableFactory.hpp)
-add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp 
FastSeparateChainingHashTable.hpp)
 add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp 
FileManagerHdfs.hpp)
@@ -271,9 +270,11 @@ 
add_library(quickstep_storage_WindowAggregationOperationState_proto ${storage_Wi
 
 
 # Link dependencies:
-target_link_libraries(quickstep_storage_AggregationHashTable
+target_link_libraries(quickstep_storage_AggregationStateHashTable
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_AggregationResultIterator
+                      quickstep_storage_AggregationStateManager
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableUntypedKeyManager
                       quickstep_storage_StorageBlob
@@ -290,8 +291,18 @@ 
target_link_libraries(quickstep_storage_AggregationHashTable
                       quickstep_types_TypedValue
                       quickstep_utility_Alignment
                       quickstep_utility_HashPair
+                      quickstep_utility_InlineMemcpy
                       quickstep_utility_Macros
-                      quickstep_utility_PrimeNumber)
+                      quickstep_utility_PrimeNumber
+                      quickstep_utility_ScopedBuffer)
+target_link_libraries(quickstep_storage_AggregationStateManager
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_threading_SpinMutex
+                      quickstep_threading_SpinSharedMutex
+                      quickstep_utility_InlineMemcpy
+                      quickstep_utility_Macros
+                      quickstep_utility_ScopedBuffer)
 target_link_libraries(quickstep_storage_AggregationOperationState
                       glog
                       quickstep_catalog_CatalogDatabaseLite
@@ -302,13 +313,11 @@ 
target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_expressions_aggregation_AggregateFunction
                       
quickstep_expressions_aggregation_AggregateFunctionFactory
                       quickstep_expressions_aggregation_AggregationHandle
-                      
quickstep_expressions_aggregation_AggregationHandleDistinct
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
-                      quickstep_storage_AggregationHashTable
+                      quickstep_storage_AggregationStateHashTable
                       quickstep_storage_AggregationOperationState_proto
-                      quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTablePool
@@ -321,12 +330,17 @@ 
target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_types_containers_Tuple
-                      quickstep_utility_Macros)
+                      quickstep_utility_Macros
+                      quickstep_utility_ScopedBuffer)
 target_link_libraries(quickstep_storage_AggregationOperationState_proto
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_aggregation_AggregateFunction_proto
                       quickstep_storage_HashTable_proto
                       ${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_storage_AggregationResultIterator
+                      quickstep_storage_AggregationStateManager
+                      quickstep_storage_HashTableUntypedKeyManager
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_BasicColumnStoreTupleStorageSubBlock
                       quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogRelationSchema
@@ -654,53 +668,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy
                       quickstep_threading_SpinMutex
                       quickstep_threading_SpinSharedMutex
                       quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTable
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_StorageBlob
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
-                      quickstep_storage_StorageManager
-                      quickstep_storage_TupleReference
-                      quickstep_storage_ValueAccessor
-                      quickstep_storage_ValueAccessorUtil
-                      quickstep_threading_SpinMutex
-                      quickstep_threading_SpinSharedMutex
-                      quickstep_types_Type
-                      quickstep_types_TypedValue
-                      quickstep_utility_BloomFilter
-                      quickstep_utility_HashPair
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTableFactory
-                      glog
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastSeparateChainingHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTable_proto
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
-                      quickstep_storage_LinearOpenAddressingHashTable
-                      quickstep_storage_SeparateChainingHashTable
-                      quickstep_storage_SimpleScalarSeparateChainingHashTable
-                      quickstep_storage_TupleReference
-                      quickstep_types_TypeFactory
-                      quickstep_utility_BloomFilter
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableKeyManager
-                      quickstep_storage_StorageBlob
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
-                      quickstep_storage_StorageManager
-                      quickstep_threading_SpinSharedMutex
-                      quickstep_types_Type
-                      quickstep_types_TypedValue
-                      quickstep_utility_Alignment
-                      quickstep_utility_Macros
-                      quickstep_utility_PrimeNumber)
 target_link_libraries(quickstep_storage_FileManager
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
@@ -786,9 +753,7 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
 target_link_libraries(quickstep_storage_HashTablePool
                       glog
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_AggregationHashTable
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
+                      quickstep_storage_AggregationStateHashTable
                       quickstep_storage_HashTableBase
                       quickstep_threading_SpinMutex
                       quickstep_utility_Macros
@@ -799,6 +764,7 @@ 
target_link_libraries(quickstep_storage_HashTableUntypedKeyManager
                       quickstep_types_Type
                       quickstep_types_TypeFunctors
                       quickstep_types_TypedValue
+                      quickstep_utility_InlineMemcpy
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_IndexSubBlock
                       quickstep_catalog_CatalogTypedefs
@@ -820,6 +786,7 @@ target_link_libraries(quickstep_storage_InsertDestination
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil
+                      quickstep_storage_AggregationResultIterator
                       quickstep_storage_InsertDestinationInterface
                       quickstep_storage_InsertDestination_proto
                       quickstep_storage_StorageBlock
@@ -859,6 +826,7 @@ 
target_link_libraries(quickstep_storage_PackedRowStoreTupleStorageSubBlock
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_predicate_PredicateCost
+                      quickstep_storage_AggregationResultIterator
                       quickstep_storage_PackedRowStoreValueAccessor
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageBlockLayout_proto
@@ -994,6 +962,7 @@ target_link_libraries(quickstep_storage_StorageBlock
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
+                      quickstep_storage_AggregationResultIterator
                       quickstep_storage_BasicColumnStoreTupleStorageSubBlock
                       quickstep_storage_BloomFilterIndexSubBlock
                       quickstep_storage_CSBTreeIndexSubBlock
@@ -1022,7 +991,8 @@ target_link_libraries(quickstep_storage_StorageBlock
                       quickstep_types_containers_Tuple
                       quickstep_types_operations_comparisons_ComparisonUtil
                       quickstep_utility_Macros
-                      quickstep_utility_PtrVector)
+                      quickstep_utility_PtrVector
+                      quickstep_utility_ScopedBuffer)
 # CMAKE_VALIDATE_IGNORE_BEGIN
 if(QUICKSTEP_HAVE_BITWEAVING)
   target_link_libraries(quickstep_storage_StorageBlock
@@ -1184,9 +1154,6 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_EvictionPolicy
                       quickstep_storage_FileManager
                       quickstep_storage_FileManagerLocal
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
-                      quickstep_storage_FastSeparateChainingHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase
@@ -1208,6 +1175,9 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
                       quickstep_storage_SplitRowStoreTupleStorageSubBlock
                       quickstep_storage_SplitRowStoreValueAccessor
+                      quickstep_storage_AggregationResultIterator
+                      quickstep_storage_AggregationStateHashTable
+                      quickstep_storage_AggregationStateManager
                       quickstep_storage_StorageBlob
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockBase


Reply via email to