http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c123bd49/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp 
b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
index 0e35151..1d1c084 100644
--- a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
@@ -28,6 +28,8 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationHandleSum.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/FastHashTableFactory.hpp"
 #include "storage/StorageManager.hpp"
 #include "types/CharType.hpp"
 #include "types/DatetimeIntervalType.hpp"
@@ -52,51 +54,56 @@
 
 namespace quickstep {
 
-class AggregationHandleSumTest : public::testing::Test {
+class AggregationHandleSumTest : public ::testing::Test {
  protected:
   static const int kNumSamples = 1000;
 
   // Helper method that calls AggregationHandleSum::iterateUnaryInl() to
   // aggregate 'value' into '*state'.
   void iterateHandle(AggregationState *state, const TypedValue &value) {
-    static_cast<const 
AggregationHandleSum&>(*aggregation_handle_sum_).iterateUnaryInl(
-        static_cast<AggregationStateSum*>(state),
-        value);
+    static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_)
+        .iterateUnaryInl(static_cast<AggregationStateSum *>(state), value);
   }
 
   void initializeHandle(const Type &type) {
     aggregation_handle_sum_.reset(
-        AggregateFunctionFactory::Get(AggregationID::kSum).createHandle(
-            std::vector<const Type*>(1, &type)));
+        AggregateFunctionFactory::Get(AggregationID::kSum)
+            .createHandle(std::vector<const Type *>(1, &type)));
     aggregation_handle_sum_state_.reset(
         aggregation_handle_sum_->createInitialState());
   }
 
   static bool ApplyToTypesTest(TypeID typeID) {
-    const Type &type = (typeID == kChar || typeID == kVarChar) ?
-        TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
-        TypeFactory::GetType(typeID);
+    const Type &type =
+        (typeID == kChar || typeID == kVarChar)
+            ? TypeFactory::GetType(typeID, static_cast<std::size_t>(10))
+            : TypeFactory::GetType(typeID);
 
-    return AggregateFunctionFactory::Get(AggregationID::kSum).canApplyToTypes(
-        std::vector<const Type*>(1, &type));
+    return AggregateFunctionFactory::Get(AggregationID::kSum)
+        .canApplyToTypes(std::vector<const Type *>(1, &type));
   }
 
   static bool ResultTypeForArgumentTypeTest(TypeID input_type_id,
                                             TypeID output_type_id) {
-    const Type *result_type
-        = 
AggregateFunctionFactory::Get(AggregationID::kSum).resultTypeForArgumentTypes(
-            std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+    const Type *result_type =
+        AggregateFunctionFactory::Get(AggregationID::kSum)
+            .resultTypeForArgumentTypes(std::vector<const Type *>(
+                1, &TypeFactory::GetType(input_type_id)));
     return (result_type->getTypeID() == output_type_id);
   }
 
   template <typename CppType>
-  static void CheckSumValue(
-      CppType expected,
-      const AggregationHandle &target,
-      const AggregationState &state) {
+  static void CheckSumValue(CppType expected,
+                            const AggregationHandle &target,
+                            const AggregationState &state) {
     EXPECT_EQ(expected, target.finalize(state).getLiteral<CppType>());
   }
 
+  template <typename CppType>
+  static void CheckSumValue(CppType expected, const TypedValue &value) {
+    EXPECT_EQ(expected, value.getLiteral<CppType>());
+  }
+
   // Static templated method to set a meaningful to data types.
   template <typename CppType>
   static void SetDataType(int value, CppType *data) {
@@ -108,7 +115,9 @@ class AggregationHandleSumTest : public::testing::Test {
     const GenericType &type = GenericType::Instance(true);
 
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
+            .isNull());
 
     typename GenericType::cpptype val;
     typename PrecisionType::cpptype sum;
@@ -119,13 +128,14 @@ class AggregationHandleSumTest : public::testing::Test {
       if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
         SetDataType(i - 10, &val);
       } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
+        SetDataType(static_cast<float>(i - 10) / 10, &val);
       }
       iterateHandle(aggregation_handle_sum_state_.get(), type.makeValue(&val));
       sum += val;
     }
     iterateHandle(aggregation_handle_sum_state_.get(), type.makeNullValue());
-    CheckSumValue<typename PrecisionType::cpptype>(sum, 
*aggregation_handle_sum_, *aggregation_handle_sum_state_);
+    CheckSumValue<typename PrecisionType::cpptype>(
+        sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
 
     // Test mergeStates().
     std::unique_ptr<AggregationState> merge_state(
@@ -138,7 +148,7 @@ class AggregationHandleSumTest : public::testing::Test {
       if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
         SetDataType(i - 10, &val);
       } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
+        SetDataType(static_cast<float>(i - 10) / 10, &val);
       }
       iterateHandle(merge_state.get(), type.makeValue(&val));
       sum += val;
@@ -146,13 +156,11 @@ class AggregationHandleSumTest : public::testing::Test {
     aggregation_handle_sum_->mergeStates(*merge_state,
                                          aggregation_handle_sum_state_.get());
     CheckSumValue<typename PrecisionType::cpptype>(
-        sum,
-        *aggregation_handle_sum_,
-        *aggregation_handle_sum_state_);
+        sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
   }
 
   template <typename GenericType, typename Output>
-  ColumnVector *createColumnVectorGeneric(const Type &type, Output *sum) {
+  ColumnVector* createColumnVectorGeneric(const Type &type, Output *sum) {
     NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3);
 
     typename GenericType::cpptype val;
@@ -163,12 +171,12 @@ class AggregationHandleSumTest : public::testing::Test {
       if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
         SetDataType(i - 10, &val);
       } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
+        SetDataType(static_cast<float>(i - 10) / 10, &val);
       }
       column->appendTypedValue(type.makeValue(&val));
       *sum += val;
       // One NULL in the middle.
-      if (i == kNumSamples/2) {
+      if (i == kNumSamples / 2) {
         column->appendTypedValue(type.makeNullValue());
       }
     }
@@ -182,12 +190,15 @@ class AggregationHandleSumTest : public::testing::Test {
     const GenericType &type = GenericType::Instance(true);
 
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
+            .isNull());
 
     typename PrecisionType::cpptype sum;
     std::vector<std::unique_ptr<ColumnVector>> column_vectors;
     column_vectors.emplace_back(
-        createColumnVectorGeneric<GenericType, typename 
PrecisionType::cpptype>(type, &sum));
+        createColumnVectorGeneric<GenericType, typename 
PrecisionType::cpptype>(
+            type, &sum));
 
     std::unique_ptr<AggregationState> cv_state(
         aggregation_handle_sum_->accumulateColumnVectors(column_vectors));
@@ -195,15 +206,12 @@ class AggregationHandleSumTest : public::testing::Test {
     // Test the state generated directly by accumulateColumnVectors(), and also
     // test after merging back.
     CheckSumValue<typename PrecisionType::cpptype>(
-        sum,
-        *aggregation_handle_sum_,
-        *cv_state);
+        sum, *aggregation_handle_sum_, *cv_state);
 
-    aggregation_handle_sum_->mergeStates(*cv_state, 
aggregation_handle_sum_state_.get());
+    aggregation_handle_sum_->mergeStates(*cv_state,
+                                         aggregation_handle_sum_state_.get());
     CheckSumValue<typename PrecisionType::cpptype>(
-        sum,
-        *aggregation_handle_sum_,
-        *aggregation_handle_sum_state_);
+        sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
   }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -212,29 +220,30 @@ class AggregationHandleSumTest : public::testing::Test {
     const GenericType &type = GenericType::Instance(true);
 
     initializeHandle(type);
-    
EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
+            .isNull());
 
     typename PrecisionType::cpptype sum;
-    std::unique_ptr<ColumnVectorsValueAccessor> accessor(new 
ColumnVectorsValueAccessor());
+    std::unique_ptr<ColumnVectorsValueAccessor> accessor(
+        new ColumnVectorsValueAccessor());
     accessor->addColumn(
-        createColumnVectorGeneric<GenericType, typename 
PrecisionType::cpptype>(type, &sum));
+        createColumnVectorGeneric<GenericType, typename 
PrecisionType::cpptype>(
+            type, &sum));
 
     std::unique_ptr<AggregationState> va_state(
-        aggregation_handle_sum_->accumulateValueAccessor(accessor.get(),
-                                                         
std::vector<attribute_id>(1, 0)));
+        aggregation_handle_sum_->accumulateValueAccessor(
+            accessor.get(), std::vector<attribute_id>(1, 0)));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
     CheckSumValue<typename PrecisionType::cpptype>(
-        sum,
-        *aggregation_handle_sum_,
-        *va_state);
+        sum, *aggregation_handle_sum_, *va_state);
 
-    aggregation_handle_sum_->mergeStates(*va_state, 
aggregation_handle_sum_state_.get());
+    aggregation_handle_sum_->mergeStates(*va_state,
+                                         aggregation_handle_sum_state_.get());
     CheckSumValue<typename PrecisionType::cpptype>(
-        sum,
-        *aggregation_handle_sum_,
-        *aggregation_handle_sum_state_);
+        sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
   }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
@@ -247,9 +256,7 @@ const int AggregationHandleSumTest::kNumSamples;
 
 template <>
 void AggregationHandleSumTest::CheckSumValue<float>(
-    float val,
-    const AggregationHandle &handle,
-    const AggregationState &state) {
+    float val, const AggregationHandle &handle, const AggregationState &state) 
{
   EXPECT_FLOAT_EQ(val, handle.finalize(state).getLiteral<float>());
 }
 
@@ -262,12 +269,14 @@ void AggregationHandleSumTest::CheckSumValue<double>(
 }
 
 template <>
-void AggregationHandleSumTest::SetDataType<DatetimeIntervalLit>(int value, 
DatetimeIntervalLit *data) {
+void AggregationHandleSumTest::SetDataType<DatetimeIntervalLit>(
+    int value, DatetimeIntervalLit *data) {
   data->interval_ticks = value;
 }
 
 template <>
-void AggregationHandleSumTest::SetDataType<YearMonthIntervalLit>(int value, 
YearMonthIntervalLit *data) {
+void AggregationHandleSumTest::SetDataType<YearMonthIntervalLit>(
+    int value, YearMonthIntervalLit *data) {
   data->months = value;
 }
 
@@ -314,11 +323,13 @@ TEST_F(AggregationHandleSumTest, 
DoubleTypeColumnVectorTest) {
 }
 
 TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeColumnVectorTest) {
-  checkAggregationSumGenericColumnVector<DatetimeIntervalType, 
DatetimeIntervalType>();
+  checkAggregationSumGenericColumnVector<DatetimeIntervalType,
+                                         DatetimeIntervalType>();
 }
 
 TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeColumnVectorTest) {
-  checkAggregationSumGenericColumnVector<YearMonthIntervalType, 
YearMonthIntervalType>();
+  checkAggregationSumGenericColumnVector<YearMonthIntervalType,
+                                         YearMonthIntervalType>();
 }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -339,11 +350,13 @@ TEST_F(AggregationHandleSumTest, 
DoubleTypeValueAccessorTest) {
 }
 
 TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeValueAccessorTest) {
-  checkAggregationSumGenericValueAccessor<DatetimeIntervalType, 
DatetimeIntervalType>();
+  checkAggregationSumGenericValueAccessor<DatetimeIntervalType,
+                                          DatetimeIntervalType>();
 }
 
 TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeValueAccessorTest) {
-  checkAggregationSumGenericValueAccessor<YearMonthIntervalType, 
YearMonthIntervalType>();
+  checkAggregationSumGenericValueAccessor<YearMonthIntervalType,
+                                          YearMonthIntervalType>();
 }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
@@ -373,38 +386,53 @@ TEST_F(AggregationHandleSumDeathTest, WrongTypeTest) {
   float float_val = 0;
 
   // Passes.
-  iterateHandle(aggregation_handle_sum_state_.get(), 
int_non_null_type.makeValue(&int_val));
+  iterateHandle(aggregation_handle_sum_state_.get(),
+                int_non_null_type.makeValue(&int_val));
 
-  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), 
long_type.makeValue(&long_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), 
double_type.makeValue(&double_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), 
float_type.makeValue(&float_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), 
char_type.makeValue("asdf", 5)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), 
varchar_type.makeValue("asdf", 5)), "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+                             long_type.makeValue(&long_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+                             double_type.makeValue(&double_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+                             float_type.makeValue(&float_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+                             char_type.makeValue("asdf", 5)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+                             varchar_type.makeValue("asdf", 5)),
+               "");
 
   // Test mergeStates() with incorrectly typed handles.
   std::unique_ptr<AggregationHandle> aggregation_handle_sum_double(
-      AggregateFunctionFactory::Get(AggregationID::kSum).createHandle(
-          std::vector<const Type*>(1, &double_type)));
+      AggregateFunctionFactory::Get(AggregationID::kSum)
+          .createHandle(std::vector<const Type *>(1, &double_type)));
   std::unique_ptr<AggregationState> aggregation_state_sum_merge_double(
       aggregation_handle_sum_double->createInitialState());
-  static_cast<const 
AggregationHandleSum&>(*aggregation_handle_sum_double).iterateUnaryInl(
-      
static_cast<AggregationStateSum*>(aggregation_state_sum_merge_double.get()),
-      double_type.makeValue(&double_val));
-  
EXPECT_DEATH(aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_double,
-                                                    
aggregation_handle_sum_state_.get()),
-               "");
+  static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_double)
+      .iterateUnaryInl(static_cast<AggregationStateSum *>(
+                           aggregation_state_sum_merge_double.get()),
+                       double_type.makeValue(&double_val));
+  EXPECT_DEATH(
+      aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_double,
+                                           
aggregation_handle_sum_state_.get()),
+      "");
 
   std::unique_ptr<AggregationHandle> aggregation_handle_sum_float(
-      AggregateFunctionFactory::Get(AggregationID::kSum).createHandle(
-          std::vector<const Type*>(1, &float_type)));
+      AggregateFunctionFactory::Get(AggregationID::kSum)
+          .createHandle(std::vector<const Type *>(1, &float_type)));
   std::unique_ptr<AggregationState> aggregation_state_sum_merge_float(
       aggregation_handle_sum_float->createInitialState());
-  static_cast<const 
AggregationHandleSum&>(*aggregation_handle_sum_float).iterateUnaryInl(
-      
static_cast<AggregationStateSum*>(aggregation_state_sum_merge_float.get()),
-      float_type.makeValue(&float_val));
-  
EXPECT_DEATH(aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_float,
-                                                    
aggregation_handle_sum_state_.get()),
-               "");
+  static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_float)
+      .iterateUnaryInl(static_cast<AggregationStateSum *>(
+                           aggregation_state_sum_merge_float.get()),
+                       float_type.makeValue(&float_val));
+  EXPECT_DEATH(
+      aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_float,
+                                           
aggregation_handle_sum_state_.get()),
+      "");
 }
 #endif
 
@@ -425,8 +453,10 @@ TEST_F(AggregationHandleSumTest, 
ResultTypeForArgumentTypeTest) {
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kLong));
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble));
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, 
kDatetimeInterval));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, 
kYearMonthInterval));
+  EXPECT_TRUE(
+      ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
+  EXPECT_TRUE(
+      ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
 }
 
 TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
@@ -434,25 +464,28 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
   initializeHandle(long_non_null_type);
   storage_manager_.reset(new StorageManager("./test_sum_data"));
   std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
-      aggregation_handle_sum_->createGroupByHashTable(
-          HashTableImplType::kSimpleScalarSeparateChaining,
+      AggregationStateFastHashTableFactory::CreateResizable(
+          HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
+          {aggregation_handle_sum_.get()->getPayloadSize()},
+          {aggregation_handle_sum_.get()},
           storage_manager_.get()));
   std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
-      aggregation_handle_sum_->createGroupByHashTable(
-          HashTableImplType::kSimpleScalarSeparateChaining,
+      AggregationStateFastHashTableFactory::CreateResizable(
+          HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
+          {aggregation_handle_sum_.get()->getPayloadSize()},
+          {aggregation_handle_sum_.get()},
           storage_manager_.get()));
 
-  AggregationStateHashTable<AggregationStateSum> 
*destination_hash_table_derived =
-      static_cast<AggregationStateHashTable<AggregationStateSum> *>(
+  AggregationStateFastHashTable *destination_hash_table_derived =
+      static_cast<AggregationStateFastHashTable *>(
           destination_hash_table.get());
 
-  AggregationStateHashTable<AggregationStateSum> *source_hash_table_derived =
-      static_cast<AggregationStateHashTable<AggregationStateSum> *>(
-          source_hash_table.get());
+  AggregationStateFastHashTable *source_hash_table_derived =
+      static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
 
   AggregationHandleSum *aggregation_handle_sum_derived =
       static_cast<AggregationHandleSum *>(aggregation_handle_sum_.get());
@@ -471,7 +504,8 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
   const std::int64_t common_key_destination_sum = 4000;
   TypedValue common_key_destination_sum_val(common_key_destination_sum);
 
-  const std::int64_t merged_common_key = common_key_source_sum + 
common_key_destination_sum;
+  const std::int64_t merged_common_key =
+      common_key_source_sum + common_key_destination_sum;
   TypedValue common_key_merged_val(merged_common_key);
 
   const std::int64_t exclusive_key_source_sum = 100;
@@ -496,59 +530,82 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
   // Create sum value states for keys.
   
aggregation_handle_sum_derived->iterateUnaryInl(common_key_source_state.get(),
                                                   common_key_source_sum_val);
-  std::int64_t actual_val = 
aggregation_handle_sum_->finalize(*common_key_source_state)
-                       .getLiteral<std::int64_t>();
+  std::int64_t actual_val =
+      aggregation_handle_sum_->finalize(*common_key_source_state)
+          .getLiteral<std::int64_t>();
   EXPECT_EQ(common_key_source_sum_val.getLiteral<std::int64_t>(), actual_val);
 
   aggregation_handle_sum_derived->iterateUnaryInl(
       common_key_destination_state.get(), common_key_destination_sum_val);
   actual_val = aggregation_handle_sum_->finalize(*common_key_destination_state)
                    .getLiteral<std::int64_t>();
-  EXPECT_EQ(common_key_destination_sum_val.getLiteral<std::int64_t>(), 
actual_val);
+  EXPECT_EQ(common_key_destination_sum_val.getLiteral<std::int64_t>(),
+            actual_val);
 
   aggregation_handle_sum_derived->iterateUnaryInl(
       exclusive_key_destination_state.get(), 
exclusive_key_destination_sum_val);
   actual_val =
       aggregation_handle_sum_->finalize(*exclusive_key_destination_state)
           .getLiteral<std::int64_t>();
-  EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(), 
actual_val);
+  EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
+            actual_val);
 
   aggregation_handle_sum_derived->iterateUnaryInl(
       exclusive_key_source_state.get(), exclusive_key_source_sum_val);
   actual_val = aggregation_handle_sum_->finalize(*exclusive_key_source_state)
                    .getLiteral<std::int64_t>();
-  EXPECT_EQ(exclusive_key_source_sum_val.getLiteral<std::int64_t>(), 
actual_val);
+  EXPECT_EQ(exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
+            actual_val);
 
   // Add the key-state pairs to the hash tables.
-  source_hash_table_derived->putCompositeKey(common_key,
-                                             *common_key_source_state);
-  destination_hash_table_derived->putCompositeKey(
-      common_key, *common_key_destination_state);
-  source_hash_table_derived->putCompositeKey(exclusive_source_key,
-                                             *exclusive_key_source_state);
-  destination_hash_table_derived->putCompositeKey(
-      exclusive_destination_key, *exclusive_key_destination_state);
+  unsigned char buffer[100];
+  buffer[0] = '\0';
+  memcpy(buffer + 1,
+         common_key_source_state.get()->getPayloadAddress(),
+         aggregation_handle_sum_.get()->getPayloadSize());
+  source_hash_table_derived->putCompositeKey(common_key, buffer);
+
+  memcpy(buffer + 1,
+         common_key_destination_state.get()->getPayloadAddress(),
+         aggregation_handle_sum_.get()->getPayloadSize());
+  destination_hash_table_derived->putCompositeKey(common_key, buffer);
+
+  memcpy(buffer + 1,
+         exclusive_key_source_state.get()->getPayloadAddress(),
+         aggregation_handle_sum_.get()->getPayloadSize());
+  source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+
+  memcpy(buffer + 1,
+         exclusive_key_destination_state.get()->getPayloadAddress(),
+         aggregation_handle_sum_.get()->getPayloadSize());
+  destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
+                                                      buffer);
 
   EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
   EXPECT_EQ(2u, source_hash_table_derived->numEntries());
 
-  aggregation_handle_sum_->mergeGroupByHashTables(*source_hash_table,
-                                                  
destination_hash_table.get());
+  AggregationOperationState::mergeGroupByHashTables(
+      source_hash_table.get(), destination_hash_table.get());
 
   EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
 
   CheckSumValue<std::int64_t>(
       common_key_merged_val.getLiteral<std::int64_t>(),
-      *aggregation_handle_sum_derived,
-      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
-  
CheckSumValue<std::int64_t>(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
-                     *aggregation_handle_sum_derived,
-                     *(destination_hash_table_derived->getSingleCompositeKey(
-                         exclusive_destination_key)));
-  
CheckSumValue<std::int64_t>(exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
-                     *aggregation_handle_sum_derived,
-                     *(source_hash_table_derived->getSingleCompositeKey(
-                         exclusive_source_key)));
+      aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+          destination_hash_table_derived->getSingleCompositeKey(common_key) +
+          1));
+  CheckSumValue<std::int64_t>(
+      exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
+      aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+          destination_hash_table_derived->getSingleCompositeKey(
+              exclusive_destination_key) +
+          1));
+  CheckSumValue<std::int64_t>(
+      exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
+      aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+          source_hash_table_derived->getSingleCompositeKey(
+              exclusive_source_key) +
+          1));
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c123bd49/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp 
b/storage/AggregationOperationState.cpp
index 05d0636..c5f59f9 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -59,7 +59,7 @@ namespace quickstep {
 
 AggregationOperationState::AggregationOperationState(
     const CatalogRelationSchema &input_relation,
-    const std::vector<const AggregateFunction*> &aggregate_functions,
+    const std::vector<const AggregateFunction *> &aggregate_functions,
     std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
     std::vector<bool> &&is_distinct,
     std::vector<std::unique_ptr<const Scalar>> &&group_by,
@@ -78,7 +78,7 @@ AggregationOperationState::AggregationOperationState(
   DCHECK(aggregate_functions.size() == arguments_.size());
 
   // Get the types of GROUP BY expressions for creating HashTables below.
-  std::vector<const Type*> group_by_types;
+  std::vector<const Type *> group_by_types;
   for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) 
{
     group_by_types.emplace_back(&group_by_element->getType());
   }
@@ -94,27 +94,29 @@ AggregationOperationState::AggregationOperationState(
     handles_.emplace_back(new AggregationHandleDistinct());
     arguments_.push_back({});
     is_distinct_.emplace_back(false);
-    group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
-        new HashTablePool(estimated_num_entries,
-                          hash_table_impl_type,
-                          group_by_types,
-                          {1},
-                          handles_,
-                          storage_manager)));
+    group_by_hashtable_pools_.emplace_back(
+        std::unique_ptr<HashTablePool>(new HashTablePool(estimated_num_entries,
+                                                         hash_table_impl_type,
+                                                         group_by_types,
+                                                         {1},
+                                                         handles_,
+                                                         storage_manager)));
   } else {
     // Set up each individual aggregate in this operation.
-    std::vector<const AggregateFunction*>::const_iterator agg_func_it
-        = aggregate_functions.begin();
-    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator 
args_it
-        = arguments_.begin();
+    std::vector<const AggregateFunction *>::const_iterator agg_func_it =
+        aggregate_functions.begin();
+    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
+        args_it = arguments_.begin();
     std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
-    std::vector<HashTableImplType>::const_iterator 
distinctify_hash_table_impl_types_it
-        = distinctify_hash_table_impl_types.begin();
+    std::vector<HashTableImplType>::const_iterator
+        distinctify_hash_table_impl_types_it =
+            distinctify_hash_table_impl_types.begin();
     std::vector<std::size_t> payload_sizes;
-    for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, 
++is_distinct_it) {
+    for (; agg_func_it != aggregate_functions.end();
+         ++agg_func_it, ++args_it, ++is_distinct_it) {
       // Get the Types of this aggregate's arguments so that we can create an
       // AggregationHandle.
-      std::vector<const Type*> argument_types;
+      std::vector<const Type *> argument_types;
       for (const std::unique_ptr<const Scalar> &argument : *args_it) {
         argument_types.emplace_back(&argument->getType());
       }
@@ -129,12 +131,13 @@ AggregationOperationState::AggregationOperationState(
       handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
 
       if (!group_by_list_.empty()) {
-        // Aggregation with GROUP BY: combined payload is partially updated in 
the presence of DISTINCT.
-         if (*is_distinct_it) {
-            handles_.back()->BlockUpdate();
-         }
-         group_by_handles.emplace_back(handles_.back());
-         payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
+        // Aggregation with GROUP BY: combined payload is partially updated in
+        // the presence of DISTINCT.
+        if (*is_distinct_it) {
+          handles_.back()->blockUpdate();
+        }
+        group_by_handles.emplace_back(handles_.back());
+        payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
       } else {
         // Aggregation without GROUP BY: create a single global state.
         single_states_.emplace_back(handles_.back()->createInitialState());
@@ -146,31 +149,38 @@ AggregationOperationState::AggregationOperationState(
         std::vector<attribute_id> local_arguments_as_attributes;
         local_arguments_as_attributes.reserve(args_it->size());
         for (const std::unique_ptr<const Scalar> &argument : *args_it) {
-          const attribute_id argument_id = 
argument->getAttributeIdForValueAccessor();
+          const attribute_id argument_id =
+              argument->getAttributeIdForValueAccessor();
           if (argument_id == -1) {
             local_arguments_as_attributes.clear();
             break;
           } else {
-            DCHECK_EQ(input_relation_.getID(), 
argument->getRelationIdForValueAccessor());
+            DCHECK_EQ(input_relation_.getID(),
+                      argument->getRelationIdForValueAccessor());
             local_arguments_as_attributes.push_back(argument_id);
           }
         }
 
-        
arguments_as_attributes_.emplace_back(std::move(local_arguments_as_attributes));
+        arguments_as_attributes_.emplace_back(
+            std::move(local_arguments_as_attributes));
 #endif
       }
 
-      // Initialize the corresponding distinctify hash table if this is a 
DISTINCT
+      // Initialize the corresponding distinctify hash table if this is a
+      // DISTINCT
       // aggregation.
       if (*is_distinct_it) {
-        std::vector<const Type*> key_types(group_by_types);
-        key_types.insert(key_types.end(), argument_types.begin(), 
argument_types.end());
-        // TODO(jianqiao): estimated_num_entries is quite inaccurate for 
estimating
+        std::vector<const Type *> key_types(group_by_types);
+        key_types.insert(
+            key_types.end(), argument_types.begin(), argument_types.end());
+        // TODO(jianqiao): estimated_num_entries is quite inaccurate for
+        // estimating
         // the number of entries in the distinctify hash table. We may estimate
-        // for each distinct aggregation an estimated_num_distinct_keys value 
during
+        // for each distinct aggregation an estimated_num_distinct_keys value
+        // during
         // query optimization, if it worths.
         distinctify_hashtables_.emplace_back(
-        AggregationStateFastHashTableFactory::CreateResizable(
+            AggregationStateFastHashTableFactory::CreateResizable(
                 *distinctify_hash_table_impl_types_it,
                 key_types,
                 estimated_num_entries,
@@ -184,16 +194,17 @@ AggregationOperationState::AggregationOperationState(
     }
 
     if (!group_by_handles.empty()) {
-      // Aggregation with GROUP BY: create a HashTable pool for per-group 
states.
+      // Aggregation with GROUP BY: create a HashTable pool for per-group
+      // states.
       group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
-            new HashTablePool(estimated_num_entries,
-                              hash_table_impl_type,
-                              group_by_types,
-                              payload_sizes,
-                              group_by_handles,
-                              storage_manager)));
-      }
+          new HashTablePool(estimated_num_entries,
+                            hash_table_impl_type,
+                            group_by_types,
+                            payload_sizes,
+                            group_by_handles,
+                            storage_manager)));
     }
+  }
 }
 
 AggregationOperationState* AggregationOperationState::ReconstructFromProto(
@@ -203,7 +214,7 @@ AggregationOperationState* 
AggregationOperationState::ReconstructFromProto(
   DCHECK(ProtoIsValid(proto, database));
 
   // Rebuild contructor arguments from their representation in 'proto'.
-  std::vector<const AggregateFunction*> aggregate_functions;
+  std::vector<const AggregateFunction *> aggregate_functions;
   std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments;
   std::vector<bool> is_distinct;
   std::vector<HashTableImplType> distinctify_hash_table_impl_types;
@@ -216,62 +227,63 @@ AggregationOperationState* 
AggregationOperationState::ReconstructFromProto(
 
     arguments.emplace_back();
     arguments.back().reserve(agg_proto.argument_size());
-    for (int argument_idx = 0; argument_idx < agg_proto.argument_size(); 
++argument_idx) {
+    for (int argument_idx = 0; argument_idx < agg_proto.argument_size();
+         ++argument_idx) {
       arguments.back().emplace_back(ScalarFactory::ReconstructFromProto(
-          agg_proto.argument(argument_idx),
-          database));
+          agg_proto.argument(argument_idx), database));
     }
 
     is_distinct.emplace_back(agg_proto.is_distinct());
 
     if (agg_proto.is_distinct()) {
       distinctify_hash_table_impl_types.emplace_back(
-          HashTableImplTypeFromProto(
-              
proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index)));
+          HashTableImplTypeFromProto(proto.distinctify_hash_table_impl_types(
+              distinctify_hash_table_impl_type_index)));
       ++distinctify_hash_table_impl_type_index;
     }
   }
 
   std::vector<std::unique_ptr<const Scalar>> group_by_expressions;
-  for (int group_by_idx = 0;
-       group_by_idx < proto.group_by_expressions_size();
+  for (int group_by_idx = 0; group_by_idx < proto.group_by_expressions_size();
        ++group_by_idx) {
     group_by_expressions.emplace_back(ScalarFactory::ReconstructFromProto(
-        proto.group_by_expressions(group_by_idx),
-        database));
+        proto.group_by_expressions(group_by_idx), database));
   }
 
   unique_ptr<Predicate> predicate;
   if (proto.has_predicate()) {
     predicate.reset(
-        PredicateFactory::ReconstructFromProto(proto.predicate(),
-                                               database));
+        PredicateFactory::ReconstructFromProto(proto.predicate(), database));
   }
 
-  return new 
AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
-                                       aggregate_functions,
-                                       std::move(arguments),
-                                       std::move(is_distinct),
-                                       std::move(group_by_expressions),
-                                       predicate.release(),
-                                       proto.estimated_num_entries(),
-                                       
HashTableImplTypeFromProto(proto.hash_table_impl_type()),
-                                       distinctify_hash_table_impl_types,
-                                       storage_manager);
+  return new AggregationOperationState(
+      database.getRelationSchemaById(proto.relation_id()),
+      aggregate_functions,
+      std::move(arguments),
+      std::move(is_distinct),
+      std::move(group_by_expressions),
+      predicate.release(),
+      proto.estimated_num_entries(),
+      HashTableImplTypeFromProto(proto.hash_table_impl_type()),
+      distinctify_hash_table_impl_types,
+      storage_manager);
 }
 
-bool AggregationOperationState::ProtoIsValid(const 
serialization::AggregationOperationState &proto,
-                                             const CatalogDatabaseLite 
&database) {
+bool AggregationOperationState::ProtoIsValid(
+    const serialization::AggregationOperationState &proto,
+    const CatalogDatabaseLite &database) {
   if (!proto.IsInitialized() ||
       !database.hasRelationWithId(proto.relation_id()) ||
       (proto.aggregates_size() < 0)) {
     return false;
   }
 
-  std::size_t num_distinctify_hash_tables = 
proto.distinctify_hash_table_impl_types_size();
+  std::size_t num_distinctify_hash_tables =
+      proto.distinctify_hash_table_impl_types_size();
   std::size_t distinctify_hash_table_impl_type_index = 0;
   for (int i = 0; i < proto.aggregates_size(); ++i) {
-    if 
(!AggregateFunctionFactory::ProtoIsValid(proto.aggregates(i).function())) {
+    if (!AggregateFunctionFactory::ProtoIsValid(
+            proto.aggregates(i).function())) {
       return false;
     }
 
@@ -282,16 +294,18 @@ bool AggregationOperationState::ProtoIsValid(const 
serialization::AggregationOpe
     for (int argument_idx = 0;
          argument_idx < proto.aggregates(i).argument_size();
          ++argument_idx) {
-      if 
(!ScalarFactory::ProtoIsValid(proto.aggregates(i).argument(argument_idx),
-                                       database)) {
+      if (!ScalarFactory::ProtoIsValid(
+              proto.aggregates(i).argument(argument_idx), database)) {
         return false;
       }
     }
 
     if (proto.aggregates(i).is_distinct()) {
-      if (distinctify_hash_table_impl_type_index >= 
num_distinctify_hash_tables ||
+      if (distinctify_hash_table_impl_type_index >=
+              num_distinctify_hash_tables ||
           !serialization::HashTableImplType_IsValid(
-              
proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index)))
 {
+              proto.distinctify_hash_table_impl_types(
+                  distinctify_hash_table_impl_type_index))) {
         return false;
       }
     }
@@ -304,8 +318,9 @@ bool AggregationOperationState::ProtoIsValid(const 
serialization::AggregationOpe
   }
 
   if (proto.group_by_expressions_size() > 0) {
-    if (!proto.has_hash_table_impl_type()
-        || 
!serialization::HashTableImplType_IsValid(proto.hash_table_impl_type())) {
+    if (!proto.has_hash_table_impl_type() ||
+        !serialization::HashTableImplType_IsValid(
+            proto.hash_table_impl_type())) {
       return false;
     }
   }
@@ -327,7 +342,8 @@ void AggregationOperationState::aggregateBlock(const 
block_id input_block) {
   }
 }
 
-void AggregationOperationState::finalizeAggregate(InsertDestination 
*output_destination) {
+void AggregationOperationState::finalizeAggregate(
+    InsertDestination *output_destination) {
   if (group_by_list_.empty()) {
     finalizeSingleState(output_destination);
   } else {
@@ -346,19 +362,19 @@ void AggregationOperationState::mergeSingleState(
   }
 }
 
-void AggregationOperationState::aggregateBlockSingleState(const block_id 
input_block) {
+void AggregationOperationState::aggregateBlockSingleState(
+    const block_id input_block) {
   // Aggregate per-block state for each aggregate.
   std::vector<std::unique_ptr<AggregationState>> local_state;
 
-  BlockReference block(storage_manager_->getBlock(input_block, 
input_relation_));
+  BlockReference block(
+      storage_manager_->getBlock(input_block, input_relation_));
 
   // If there is a filter predicate, 'reuse_matches' holds the set of matching
   // tuples so that it can be reused across multiple aggregates (i.e. we only
   // pay the cost of evaluating the predicate once).
   std::unique_ptr<TupleIdSequence> reuse_matches;
-  for (std::size_t agg_idx = 0;
-       agg_idx < handles_.size();
-       ++agg_idx) {
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
     // If all arguments are attributes of the input relation, elide a copy.
@@ -381,12 +397,11 @@ void 
AggregationOperationState::aggregateBlockSingleState(const block_id input_b
       local_state.emplace_back(nullptr);
     } else {
       // Call StorageBlock::aggregate() to actually do the aggregation.
-      local_state.emplace_back(
-          block->aggregate(*handles_[agg_idx],
-                           arguments_[agg_idx],
-                           local_arguments_as_attributes,
-                           predicate_.get(),
-                           &reuse_matches));
+      local_state.emplace_back(block->aggregate(*handles_[agg_idx],
+                                                arguments_[agg_idx],
+                                                local_arguments_as_attributes,
+                                                predicate_.get(),
+                                                &reuse_matches));
     }
   }
 
@@ -394,8 +409,10 @@ void 
AggregationOperationState::aggregateBlockSingleState(const block_id input_b
   mergeSingleState(local_state);
 }
 
-void AggregationOperationState::aggregateBlockHashTable(const block_id 
input_block) {
-  BlockReference block(storage_manager_->getBlock(input_block, 
input_relation_));
+void AggregationOperationState::aggregateBlockHashTable(
+    const block_id input_block) {
+  BlockReference block(
+      storage_manager_->getBlock(input_block, input_relation_));
 
   // If there is a filter predicate, 'reuse_matches' holds the set of matching
   // tuples so that it can be reused across multiple aggregates (i.e. we only
@@ -407,11 +424,10 @@ void 
AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
   // GROUP BY expressions once).
   std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
 
-  for (std::size_t agg_idx = 0;
-       agg_idx < handles_.size();
-       ++agg_idx) {
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
-      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY 
expression
+      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
+      // expression
       // values and the aggregation arguments together as keys directly into 
the
       // (threadsafe) shared global distinctify HashTable for this aggregate.
       block->aggregateDistinct(*handles_[agg_idx],
@@ -429,7 +445,8 @@ void 
AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
   // directly into the (threadsafe) shared global HashTable for this
   // aggregate.
   DCHECK(group_by_hashtable_pools_[0] != nullptr);
-  AggregationStateHashTableBase *agg_hash_table = 
group_by_hashtable_pools_[0]->getHashTableFast();
+  AggregationStateHashTableBase *agg_hash_table =
+      group_by_hashtable_pools_[0]->getHashTableFast();
   DCHECK(agg_hash_table != nullptr);
   block->aggregateGroupByFast(arguments_,
                               group_by_list_,
@@ -440,32 +457,35 @@ void 
AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
   group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
 }
 
-void AggregationOperationState::finalizeSingleState(InsertDestination 
*output_destination) {
+void AggregationOperationState::finalizeSingleState(
+    InsertDestination *output_destination) {
   // Simply build up a Tuple from the finalized values for each aggregate and
   // insert it in '*output_destination'.
   std::vector<TypedValue> attribute_values;
 
-  for (std::size_t agg_idx = 0;
-       agg_idx < handles_.size();
-       ++agg_idx) {
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
       single_states_[agg_idx].reset(
-          
handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(*distinctify_hashtables_[agg_idx]));
+          handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
+              *distinctify_hashtables_[agg_idx]));
     }
 
-    
attribute_values.emplace_back(handles_[agg_idx]->finalize(*single_states_[agg_idx]));
+    attribute_values.emplace_back(
+        handles_[agg_idx]->finalize(*single_states_[agg_idx]));
   }
 
   output_destination->insertTuple(Tuple(std::move(attribute_values)));
 }
 
-void 
AggregationOperationState::mergeGroupByHashTables(AggregationStateHashTableBase 
*src,
-                                                       
AggregationStateHashTableBase *dst) {
-    HashTableMergerFast merger(dst);
-    (static_cast<FastHashTable<true, false, true, false> 
*>(src))->forEachCompositeKeyFast(&merger);
+void AggregationOperationState::mergeGroupByHashTables(
+    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
+  HashTableMergerFast merger(dst);
+  (static_cast<FastHashTable<true, false, true, false> *>(src))
+      ->forEachCompositeKeyFast(&merger);
 }
 
-void AggregationOperationState::finalizeHashTable(InsertDestination 
*output_destination) {
+void AggregationOperationState::finalizeHashTable(
+    InsertDestination *output_destination) {
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
@@ -483,17 +503,14 @@ void 
AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
          hash_table_index < static_cast<int>(hash_tables->size() - 1);
          ++hash_table_index) {
       // Merge each hash table to the last hash table.
-      mergeGroupByHashTables(
-          (*hash_tables)[hash_table_index].get(),
-          hash_tables->back().get());
+      mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
+                             hash_tables->back().get());
     }
   }
 
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
-  for (std::size_t agg_idx = 0;
-       agg_idx < handles_.size();
-       ++agg_idx) {
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
       DCHECK(group_by_hashtable_pools_[0] != nullptr);
       auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
@@ -502,18 +519,17 @@ void 
AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
         // We may have a case where hash_tables is empty, e.g. no input blocks.
         // However for aggregateOnDistinctifyHashTableForGroupBy to work
         // correctly, we should create an empty group by hash table.
-        AggregationStateHashTableBase *new_hash_table = 
group_by_hashtable_pools_[0]->getHashTableFast();
+        AggregationStateHashTableBase *new_hash_table =
+            group_by_hashtable_pools_[0]->getHashTableFast();
         group_by_hashtable_pools_[0]->returnHashTable(new_hash_table);
         hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
       }
       DCHECK(hash_tables->back() != nullptr);
       AggregationStateHashTableBase *agg_hash_table = 
hash_tables->back().get();
       DCHECK(agg_hash_table != nullptr);
-      handles_[agg_idx]->AllowUpdate();
+      handles_[agg_idx]->allowUpdate();
       handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
-          *distinctify_hashtables_[agg_idx],
-          agg_hash_table,
-          agg_idx);
+          *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
     }
 
     auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
@@ -522,16 +538,15 @@ void 
AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
       // We may have a case where hash_tables is empty, e.g. no input blocks.
       // However for aggregateOnDistinctifyHashTableForGroupBy to work
       // correctly, we should create an empty group by hash table.
-      AggregationStateHashTableBase *new_hash_table = 
group_by_hashtable_pools_[0]->getHashTable();
+      AggregationStateHashTableBase *new_hash_table =
+          group_by_hashtable_pools_[0]->getHashTable();
       group_by_hashtable_pools_[0]->returnHashTable(new_hash_table);
       hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
     }
     AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
     DCHECK(agg_hash_table != nullptr);
-    ColumnVector* agg_result_col =
-        handles_[agg_idx]->finalizeHashTable(*agg_hash_table,
-                                             &group_by_keys,
-                                              agg_idx);
+    ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
+        *agg_hash_table, &group_by_keys, agg_idx);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
@@ -549,16 +564,20 @@ void 
AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
   for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) 
{
     const Type &group_by_type = group_by_element->getType();
     if (NativeColumnVector::UsableForType(group_by_type)) {
-      NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, 
group_by_keys.size());
+      NativeColumnVector *element_cv =
+          new NativeColumnVector(group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        
element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(
+            std::move(group_key[group_by_element_idx]));
       }
     } else {
-      IndirectColumnVector *element_cv = new 
IndirectColumnVector(group_by_type, group_by_keys.size());
+      IndirectColumnVector *element_cv =
+          new IndirectColumnVector(group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        
element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(
+            std::move(group_key[group_by_element_idx]));
       }
     }
     ++group_by_element_idx;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c123bd49/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp 
b/storage/AggregationOperationState.hpp
index d408c22..7956bc6 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -102,16 +102,17 @@ class AggregationOperationState {
    *        tables. Single aggregation state (when GROUP BY list is not
    *        specified) is not allocated using memory from storage manager.
    */
-  AggregationOperationState(const CatalogRelationSchema &input_relation,
-                            const std::vector<const AggregateFunction*> 
&aggregate_functions,
-                            std::vector<std::vector<std::unique_ptr<const 
Scalar>>> &&arguments,
-                            std::vector<bool> &&is_distinct,
-                            std::vector<std::unique_ptr<const Scalar>> 
&&group_by,
-                            const Predicate *predicate,
-                            const std::size_t estimated_num_entries,
-                            const HashTableImplType hash_table_impl_type,
-                            const std::vector<HashTableImplType> 
&distinctify_hash_table_impl_types,
-                            StorageManager *storage_manager);
+  AggregationOperationState(
+      const CatalogRelationSchema &input_relation,
+      const std::vector<const AggregateFunction *> &aggregate_functions,
+      std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
+      std::vector<bool> &&is_distinct,
+      std::vector<std::unique_ptr<const Scalar>> &&group_by,
+      const Predicate *predicate,
+      const std::size_t estimated_num_entries,
+      const HashTableImplType hash_table_impl_type,
+      const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
+      StorageManager *storage_manager);
 
   ~AggregationOperationState() {}
 
@@ -143,8 +144,9 @@ class AggregationOperationState {
    *        in.
    * @return Whether proto is fully-formed and valid.
    **/
-  static bool ProtoIsValid(const serialization::AggregationOperationState 
&proto,
-                           const CatalogDatabaseLite &database);
+  static bool ProtoIsValid(
+      const serialization::AggregationOperationState &proto,
+      const CatalogDatabaseLite &database);
 
   /**
    * @brief Compute aggregates on the tuples of the given storage block,
@@ -165,12 +167,16 @@ class AggregationOperationState {
    **/
   void finalizeAggregate(InsertDestination *output_destination);
 
+  static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
+                                     AggregationStateHashTableBase *dst);
+
   int dflag;
 
  private:
   // Merge locally (per storage block) aggregated states with global 
aggregation
   // states.
-  void mergeSingleState(const std::vector<std::unique_ptr<AggregationState>> 
&local_state);
+  void mergeSingleState(
+      const std::vector<std::unique_ptr<AggregationState>> &local_state);
 
   // Aggregate on input block.
   void aggregateBlockSingleState(const block_id input_block);
@@ -187,7 +193,7 @@ class AggregationOperationState {
 
   // Each individual aggregate in this operation has an AggregationHandle and
   // some number of Scalar arguments.
-//  std::vector<std::unique_ptr<AggregationHandle>> handles_;
+  //  std::vector<std::unique_ptr<AggregationHandle>> handles_;
   std::vector<AggregationHandle *> handles_;
   std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
 
@@ -196,7 +202,8 @@ class AggregationOperationState {
   std::vector<bool> is_distinct_;
 
   // Hash table for obtaining distinct (i.e. unique) arguments.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>> 
distinctify_hashtables_;
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+      distinctify_hashtables_;
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   // If all an aggregate's argument expressions are simply attributes in
@@ -211,15 +218,14 @@ class AggregationOperationState {
   //
   // TODO(shoban): We should ideally store the aggregation state together in 
one
   // hash table to prevent multiple lookups.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>> 
group_by_hashtables_;
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+      group_by_hashtables_;
 
   // A vector of group by hash table pools, one for each group by clause.
   std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_;
 
   StorageManager *storage_manager_;
 
-  void mergeGroupByHashTables(AggregationStateHashTableBase *src, 
AggregationStateHashTableBase *dst);
-
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
 };
 

Reply via email to