http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c123bd49/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp index 0e35151..1d1c084 100644 --- a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp @@ -28,6 +28,8 @@ #include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/aggregation/AggregationHandleSum.hpp" #include "expressions/aggregation/AggregationID.hpp" +#include "storage/AggregationOperationState.hpp" +#include "storage/FastHashTableFactory.hpp" #include "storage/StorageManager.hpp" #include "types/CharType.hpp" #include "types/DatetimeIntervalType.hpp" @@ -52,51 +54,56 @@ namespace quickstep { -class AggregationHandleSumTest : public::testing::Test { +class AggregationHandleSumTest : public ::testing::Test { protected: static const int kNumSamples = 1000; // Helper method that calls AggregationHandleSum::iterateUnaryInl() to // aggregate 'value' into '*state'. void iterateHandle(AggregationState *state, const TypedValue &value) { - static_cast<const AggregationHandleSum&>(*aggregation_handle_sum_).iterateUnaryInl( - static_cast<AggregationStateSum*>(state), - value); + static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_) + .iterateUnaryInl(static_cast<AggregationStateSum *>(state), value); } void initializeHandle(const Type &type) { aggregation_handle_sum_.reset( - AggregateFunctionFactory::Get(AggregationID::kSum).createHandle( - std::vector<const Type*>(1, &type))); + AggregateFunctionFactory::Get(AggregationID::kSum) + .createHandle(std::vector<const Type *>(1, &type))); aggregation_handle_sum_state_.reset( aggregation_handle_sum_->createInitialState()); } static bool ApplyToTypesTest(TypeID typeID) { - const Type &type = (typeID == kChar || typeID == kVarChar) ? - TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) : - TypeFactory::GetType(typeID); + const Type &type = + (typeID == kChar || typeID == kVarChar) + ? TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) + : TypeFactory::GetType(typeID); - return AggregateFunctionFactory::Get(AggregationID::kSum).canApplyToTypes( - std::vector<const Type*>(1, &type)); + return AggregateFunctionFactory::Get(AggregationID::kSum) + .canApplyToTypes(std::vector<const Type *>(1, &type)); } static bool ResultTypeForArgumentTypeTest(TypeID input_type_id, TypeID output_type_id) { - const Type *result_type - = AggregateFunctionFactory::Get(AggregationID::kSum).resultTypeForArgumentTypes( - std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id))); + const Type *result_type = + AggregateFunctionFactory::Get(AggregationID::kSum) + .resultTypeForArgumentTypes(std::vector<const Type *>( + 1, &TypeFactory::GetType(input_type_id))); return (result_type->getTypeID() == output_type_id); } template <typename CppType> - static void CheckSumValue( - CppType expected, - const AggregationHandle &target, - const AggregationState &state) { + static void CheckSumValue(CppType expected, + const AggregationHandle &target, + const AggregationState &state) { EXPECT_EQ(expected, target.finalize(state).getLiteral<CppType>()); } + template <typename CppType> + static void CheckSumValue(CppType expected, const TypedValue &value) { + EXPECT_EQ(expected, value.getLiteral<CppType>()); + } + // Static templated method to set a meaningful to data types. template <typename CppType> static void SetDataType(int value, CppType *data) { @@ -108,7 +115,9 @@ class AggregationHandleSumTest : public::testing::Test { const GenericType &type = GenericType::Instance(true); initializeHandle(type); - EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull()); + EXPECT_TRUE( + aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_) + .isNull()); typename GenericType::cpptype val; typename PrecisionType::cpptype sum; @@ -119,13 +128,14 @@ class AggregationHandleSumTest : public::testing::Test { if (type.getTypeID() == kInt || type.getTypeID() == kLong) { SetDataType(i - 10, &val); } else { - SetDataType(static_cast<float>(i - 10)/10, &val); + SetDataType(static_cast<float>(i - 10) / 10, &val); } iterateHandle(aggregation_handle_sum_state_.get(), type.makeValue(&val)); sum += val; } iterateHandle(aggregation_handle_sum_state_.get(), type.makeNullValue()); - CheckSumValue<typename PrecisionType::cpptype>(sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_); + CheckSumValue<typename PrecisionType::cpptype>( + sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_); // Test mergeStates(). std::unique_ptr<AggregationState> merge_state( @@ -138,7 +148,7 @@ class AggregationHandleSumTest : public::testing::Test { if (type.getTypeID() == kInt || type.getTypeID() == kLong) { SetDataType(i - 10, &val); } else { - SetDataType(static_cast<float>(i - 10)/10, &val); + SetDataType(static_cast<float>(i - 10) / 10, &val); } iterateHandle(merge_state.get(), type.makeValue(&val)); sum += val; @@ -146,13 +156,11 @@ class AggregationHandleSumTest : public::testing::Test { aggregation_handle_sum_->mergeStates(*merge_state, aggregation_handle_sum_state_.get()); CheckSumValue<typename PrecisionType::cpptype>( - sum, - *aggregation_handle_sum_, - *aggregation_handle_sum_state_); + sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_); } template <typename GenericType, typename Output> - ColumnVector *createColumnVectorGeneric(const Type &type, Output *sum) { + ColumnVector* createColumnVectorGeneric(const Type &type, Output *sum) { NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3); typename GenericType::cpptype val; @@ -163,12 +171,12 @@ class AggregationHandleSumTest : public::testing::Test { if (type.getTypeID() == kInt || type.getTypeID() == kLong) { SetDataType(i - 10, &val); } else { - SetDataType(static_cast<float>(i - 10)/10, &val); + SetDataType(static_cast<float>(i - 10) / 10, &val); } column->appendTypedValue(type.makeValue(&val)); *sum += val; // One NULL in the middle. - if (i == kNumSamples/2) { + if (i == kNumSamples / 2) { column->appendTypedValue(type.makeNullValue()); } } @@ -182,12 +190,15 @@ class AggregationHandleSumTest : public::testing::Test { const GenericType &type = GenericType::Instance(true); initializeHandle(type); - EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull()); + EXPECT_TRUE( + aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_) + .isNull()); typename PrecisionType::cpptype sum; std::vector<std::unique_ptr<ColumnVector>> column_vectors; column_vectors.emplace_back( - createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>(type, &sum)); + createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>( + type, &sum)); std::unique_ptr<AggregationState> cv_state( aggregation_handle_sum_->accumulateColumnVectors(column_vectors)); @@ -195,15 +206,12 @@ class AggregationHandleSumTest : public::testing::Test { // Test the state generated directly by accumulateColumnVectors(), and also // test after merging back. CheckSumValue<typename PrecisionType::cpptype>( - sum, - *aggregation_handle_sum_, - *cv_state); + sum, *aggregation_handle_sum_, *cv_state); - aggregation_handle_sum_->mergeStates(*cv_state, aggregation_handle_sum_state_.get()); + aggregation_handle_sum_->mergeStates(*cv_state, + aggregation_handle_sum_state_.get()); CheckSumValue<typename PrecisionType::cpptype>( - sum, - *aggregation_handle_sum_, - *aggregation_handle_sum_state_); + sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_); } #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION @@ -212,29 +220,30 @@ class AggregationHandleSumTest : public::testing::Test { const GenericType &type = GenericType::Instance(true); initializeHandle(type); - EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull()); + EXPECT_TRUE( + aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_) + .isNull()); typename PrecisionType::cpptype sum; - std::unique_ptr<ColumnVectorsValueAccessor> accessor(new ColumnVectorsValueAccessor()); + std::unique_ptr<ColumnVectorsValueAccessor> accessor( + new ColumnVectorsValueAccessor()); accessor->addColumn( - createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>(type, &sum)); + createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>( + type, &sum)); std::unique_ptr<AggregationState> va_state( - aggregation_handle_sum_->accumulateValueAccessor(accessor.get(), - std::vector<attribute_id>(1, 0))); + aggregation_handle_sum_->accumulateValueAccessor( + accessor.get(), std::vector<attribute_id>(1, 0))); // Test the state generated directly by accumulateValueAccessor(), and also // test after merging back. CheckSumValue<typename PrecisionType::cpptype>( - sum, - *aggregation_handle_sum_, - *va_state); + sum, *aggregation_handle_sum_, *va_state); - aggregation_handle_sum_->mergeStates(*va_state, aggregation_handle_sum_state_.get()); + aggregation_handle_sum_->mergeStates(*va_state, + aggregation_handle_sum_state_.get()); CheckSumValue<typename PrecisionType::cpptype>( - sum, - *aggregation_handle_sum_, - *aggregation_handle_sum_state_); + sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_); } #endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION @@ -247,9 +256,7 @@ const int AggregationHandleSumTest::kNumSamples; template <> void AggregationHandleSumTest::CheckSumValue<float>( - float val, - const AggregationHandle &handle, - const AggregationState &state) { + float val, const AggregationHandle &handle, const AggregationState &state) { EXPECT_FLOAT_EQ(val, handle.finalize(state).getLiteral<float>()); } @@ -262,12 +269,14 @@ void AggregationHandleSumTest::CheckSumValue<double>( } template <> -void AggregationHandleSumTest::SetDataType<DatetimeIntervalLit>(int value, DatetimeIntervalLit *data) { +void AggregationHandleSumTest::SetDataType<DatetimeIntervalLit>( + int value, DatetimeIntervalLit *data) { data->interval_ticks = value; } template <> -void AggregationHandleSumTest::SetDataType<YearMonthIntervalLit>(int value, YearMonthIntervalLit *data) { +void AggregationHandleSumTest::SetDataType<YearMonthIntervalLit>( + int value, YearMonthIntervalLit *data) { data->months = value; } @@ -314,11 +323,13 @@ TEST_F(AggregationHandleSumTest, DoubleTypeColumnVectorTest) { } TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeColumnVectorTest) { - checkAggregationSumGenericColumnVector<DatetimeIntervalType, DatetimeIntervalType>(); + checkAggregationSumGenericColumnVector<DatetimeIntervalType, + DatetimeIntervalType>(); } TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeColumnVectorTest) { - checkAggregationSumGenericColumnVector<YearMonthIntervalType, YearMonthIntervalType>(); + checkAggregationSumGenericColumnVector<YearMonthIntervalType, + YearMonthIntervalType>(); } #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION @@ -339,11 +350,13 @@ TEST_F(AggregationHandleSumTest, DoubleTypeValueAccessorTest) { } TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeValueAccessorTest) { - checkAggregationSumGenericValueAccessor<DatetimeIntervalType, DatetimeIntervalType>(); + checkAggregationSumGenericValueAccessor<DatetimeIntervalType, + DatetimeIntervalType>(); } TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeValueAccessorTest) { - checkAggregationSumGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>(); + checkAggregationSumGenericValueAccessor<YearMonthIntervalType, + YearMonthIntervalType>(); } #endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION @@ -373,38 +386,53 @@ TEST_F(AggregationHandleSumDeathTest, WrongTypeTest) { float float_val = 0; // Passes. - iterateHandle(aggregation_handle_sum_state_.get(), int_non_null_type.makeValue(&int_val)); + iterateHandle(aggregation_handle_sum_state_.get(), + int_non_null_type.makeValue(&int_val)); - EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), long_type.makeValue(&long_val)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), double_type.makeValue(&double_val)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), float_type.makeValue(&float_val)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), char_type.makeValue("asdf", 5)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), varchar_type.makeValue("asdf", 5)), ""); + EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), + long_type.makeValue(&long_val)), + ""); + EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), + double_type.makeValue(&double_val)), + ""); + EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), + float_type.makeValue(&float_val)), + ""); + EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), + char_type.makeValue("asdf", 5)), + ""); + EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), + varchar_type.makeValue("asdf", 5)), + ""); // Test mergeStates() with incorrectly typed handles. std::unique_ptr<AggregationHandle> aggregation_handle_sum_double( - AggregateFunctionFactory::Get(AggregationID::kSum).createHandle( - std::vector<const Type*>(1, &double_type))); + AggregateFunctionFactory::Get(AggregationID::kSum) + .createHandle(std::vector<const Type *>(1, &double_type))); std::unique_ptr<AggregationState> aggregation_state_sum_merge_double( aggregation_handle_sum_double->createInitialState()); - static_cast<const AggregationHandleSum&>(*aggregation_handle_sum_double).iterateUnaryInl( - static_cast<AggregationStateSum*>(aggregation_state_sum_merge_double.get()), - double_type.makeValue(&double_val)); - EXPECT_DEATH(aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_double, - aggregation_handle_sum_state_.get()), - ""); + static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_double) + .iterateUnaryInl(static_cast<AggregationStateSum *>( + aggregation_state_sum_merge_double.get()), + double_type.makeValue(&double_val)); + EXPECT_DEATH( + aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_double, + aggregation_handle_sum_state_.get()), + ""); std::unique_ptr<AggregationHandle> aggregation_handle_sum_float( - AggregateFunctionFactory::Get(AggregationID::kSum).createHandle( - std::vector<const Type*>(1, &float_type))); + AggregateFunctionFactory::Get(AggregationID::kSum) + .createHandle(std::vector<const Type *>(1, &float_type))); std::unique_ptr<AggregationState> aggregation_state_sum_merge_float( aggregation_handle_sum_float->createInitialState()); - static_cast<const AggregationHandleSum&>(*aggregation_handle_sum_float).iterateUnaryInl( - static_cast<AggregationStateSum*>(aggregation_state_sum_merge_float.get()), - float_type.makeValue(&float_val)); - EXPECT_DEATH(aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_float, - aggregation_handle_sum_state_.get()), - ""); + static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_float) + .iterateUnaryInl(static_cast<AggregationStateSum *>( + aggregation_state_sum_merge_float.get()), + float_type.makeValue(&float_val)); + EXPECT_DEATH( + aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_float, + aggregation_handle_sum_state_.get()), + ""); } #endif @@ -425,8 +453,10 @@ TEST_F(AggregationHandleSumTest, ResultTypeForArgumentTypeTest) { EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kLong)); EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble)); EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble)); - EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval)); - EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval)); + EXPECT_TRUE( + ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval)); + EXPECT_TRUE( + ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval)); } TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) { @@ -434,25 +464,28 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) { initializeHandle(long_non_null_type); storage_manager_.reset(new StorageManager("./test_sum_data")); std::unique_ptr<AggregationStateHashTableBase> source_hash_table( - aggregation_handle_sum_->createGroupByHashTable( - HashTableImplType::kSimpleScalarSeparateChaining, + AggregationStateFastHashTableFactory::CreateResizable( + HashTableImplType::kSeparateChaining, std::vector<const Type *>(1, &long_non_null_type), 10, + {aggregation_handle_sum_.get()->getPayloadSize()}, + {aggregation_handle_sum_.get()}, storage_manager_.get())); std::unique_ptr<AggregationStateHashTableBase> destination_hash_table( - aggregation_handle_sum_->createGroupByHashTable( - HashTableImplType::kSimpleScalarSeparateChaining, + AggregationStateFastHashTableFactory::CreateResizable( + HashTableImplType::kSeparateChaining, std::vector<const Type *>(1, &long_non_null_type), 10, + {aggregation_handle_sum_.get()->getPayloadSize()}, + {aggregation_handle_sum_.get()}, storage_manager_.get())); - AggregationStateHashTable<AggregationStateSum> *destination_hash_table_derived = - static_cast<AggregationStateHashTable<AggregationStateSum> *>( + AggregationStateFastHashTable *destination_hash_table_derived = + static_cast<AggregationStateFastHashTable *>( destination_hash_table.get()); - AggregationStateHashTable<AggregationStateSum> *source_hash_table_derived = - static_cast<AggregationStateHashTable<AggregationStateSum> *>( - source_hash_table.get()); + AggregationStateFastHashTable *source_hash_table_derived = + static_cast<AggregationStateFastHashTable *>(source_hash_table.get()); AggregationHandleSum *aggregation_handle_sum_derived = static_cast<AggregationHandleSum *>(aggregation_handle_sum_.get()); @@ -471,7 +504,8 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) { const std::int64_t common_key_destination_sum = 4000; TypedValue common_key_destination_sum_val(common_key_destination_sum); - const std::int64_t merged_common_key = common_key_source_sum + common_key_destination_sum; + const std::int64_t merged_common_key = + common_key_source_sum + common_key_destination_sum; TypedValue common_key_merged_val(merged_common_key); const std::int64_t exclusive_key_source_sum = 100; @@ -496,59 +530,82 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) { // Create sum value states for keys. aggregation_handle_sum_derived->iterateUnaryInl(common_key_source_state.get(), common_key_source_sum_val); - std::int64_t actual_val = aggregation_handle_sum_->finalize(*common_key_source_state) - .getLiteral<std::int64_t>(); + std::int64_t actual_val = + aggregation_handle_sum_->finalize(*common_key_source_state) + .getLiteral<std::int64_t>(); EXPECT_EQ(common_key_source_sum_val.getLiteral<std::int64_t>(), actual_val); aggregation_handle_sum_derived->iterateUnaryInl( common_key_destination_state.get(), common_key_destination_sum_val); actual_val = aggregation_handle_sum_->finalize(*common_key_destination_state) .getLiteral<std::int64_t>(); - EXPECT_EQ(common_key_destination_sum_val.getLiteral<std::int64_t>(), actual_val); + EXPECT_EQ(common_key_destination_sum_val.getLiteral<std::int64_t>(), + actual_val); aggregation_handle_sum_derived->iterateUnaryInl( exclusive_key_destination_state.get(), exclusive_key_destination_sum_val); actual_val = aggregation_handle_sum_->finalize(*exclusive_key_destination_state) .getLiteral<std::int64_t>(); - EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(), actual_val); + EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(), + actual_val); aggregation_handle_sum_derived->iterateUnaryInl( exclusive_key_source_state.get(), exclusive_key_source_sum_val); actual_val = aggregation_handle_sum_->finalize(*exclusive_key_source_state) .getLiteral<std::int64_t>(); - EXPECT_EQ(exclusive_key_source_sum_val.getLiteral<std::int64_t>(), actual_val); + EXPECT_EQ(exclusive_key_source_sum_val.getLiteral<std::int64_t>(), + actual_val); // Add the key-state pairs to the hash tables. - source_hash_table_derived->putCompositeKey(common_key, - *common_key_source_state); - destination_hash_table_derived->putCompositeKey( - common_key, *common_key_destination_state); - source_hash_table_derived->putCompositeKey(exclusive_source_key, - *exclusive_key_source_state); - destination_hash_table_derived->putCompositeKey( - exclusive_destination_key, *exclusive_key_destination_state); + unsigned char buffer[100]; + buffer[0] = '\0'; + memcpy(buffer + 1, + common_key_source_state.get()->getPayloadAddress(), + aggregation_handle_sum_.get()->getPayloadSize()); + source_hash_table_derived->putCompositeKey(common_key, buffer); + + memcpy(buffer + 1, + common_key_destination_state.get()->getPayloadAddress(), + aggregation_handle_sum_.get()->getPayloadSize()); + destination_hash_table_derived->putCompositeKey(common_key, buffer); + + memcpy(buffer + 1, + exclusive_key_source_state.get()->getPayloadAddress(), + aggregation_handle_sum_.get()->getPayloadSize()); + source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer); + + memcpy(buffer + 1, + exclusive_key_destination_state.get()->getPayloadAddress(), + aggregation_handle_sum_.get()->getPayloadSize()); + destination_hash_table_derived->putCompositeKey(exclusive_destination_key, + buffer); EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); EXPECT_EQ(2u, source_hash_table_derived->numEntries()); - aggregation_handle_sum_->mergeGroupByHashTables(*source_hash_table, - destination_hash_table.get()); + AggregationOperationState::mergeGroupByHashTables( + source_hash_table.get(), destination_hash_table.get()); EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); CheckSumValue<std::int64_t>( common_key_merged_val.getLiteral<std::int64_t>(), - *aggregation_handle_sum_derived, - *(destination_hash_table_derived->getSingleCompositeKey(common_key))); - CheckSumValue<std::int64_t>(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(), - *aggregation_handle_sum_derived, - *(destination_hash_table_derived->getSingleCompositeKey( - exclusive_destination_key))); - CheckSumValue<std::int64_t>(exclusive_key_source_sum_val.getLiteral<std::int64_t>(), - *aggregation_handle_sum_derived, - *(source_hash_table_derived->getSingleCompositeKey( - exclusive_source_key))); + aggregation_handle_sum_derived->finalizeHashTableEntryFast( + destination_hash_table_derived->getSingleCompositeKey(common_key) + + 1)); + CheckSumValue<std::int64_t>( + exclusive_key_destination_sum_val.getLiteral<std::int64_t>(), + aggregation_handle_sum_derived->finalizeHashTableEntryFast( + destination_hash_table_derived->getSingleCompositeKey( + exclusive_destination_key) + + 1)); + CheckSumValue<std::int64_t>( + exclusive_key_source_sum_val.getLiteral<std::int64_t>(), + aggregation_handle_sum_derived->finalizeHashTableEntryFast( + source_hash_table_derived->getSingleCompositeKey( + exclusive_source_key) + + 1)); } } // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c123bd49/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 05d0636..c5f59f9 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -59,7 +59,7 @@ namespace quickstep { AggregationOperationState::AggregationOperationState( const CatalogRelationSchema &input_relation, - const std::vector<const AggregateFunction*> &aggregate_functions, + const std::vector<const AggregateFunction *> &aggregate_functions, std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments, std::vector<bool> &&is_distinct, std::vector<std::unique_ptr<const Scalar>> &&group_by, @@ -78,7 +78,7 @@ AggregationOperationState::AggregationOperationState( DCHECK(aggregate_functions.size() == arguments_.size()); // Get the types of GROUP BY expressions for creating HashTables below. - std::vector<const Type*> group_by_types; + std::vector<const Type *> group_by_types; for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) { group_by_types.emplace_back(&group_by_element->getType()); } @@ -94,27 +94,29 @@ AggregationOperationState::AggregationOperationState( handles_.emplace_back(new AggregationHandleDistinct()); arguments_.push_back({}); is_distinct_.emplace_back(false); - group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( - new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - {1}, - handles_, - storage_manager))); + group_by_hashtable_pools_.emplace_back( + std::unique_ptr<HashTablePool>(new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + {1}, + handles_, + storage_manager))); } else { // Set up each individual aggregate in this operation. - std::vector<const AggregateFunction*>::const_iterator agg_func_it - = aggregate_functions.begin(); - std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator args_it - = arguments_.begin(); + std::vector<const AggregateFunction *>::const_iterator agg_func_it = + aggregate_functions.begin(); + std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator + args_it = arguments_.begin(); std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin(); - std::vector<HashTableImplType>::const_iterator distinctify_hash_table_impl_types_it - = distinctify_hash_table_impl_types.begin(); + std::vector<HashTableImplType>::const_iterator + distinctify_hash_table_impl_types_it = + distinctify_hash_table_impl_types.begin(); std::vector<std::size_t> payload_sizes; - for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) { + for (; agg_func_it != aggregate_functions.end(); + ++agg_func_it, ++args_it, ++is_distinct_it) { // Get the Types of this aggregate's arguments so that we can create an // AggregationHandle. - std::vector<const Type*> argument_types; + std::vector<const Type *> argument_types; for (const std::unique_ptr<const Scalar> &argument : *args_it) { argument_types.emplace_back(&argument->getType()); } @@ -129,12 +131,13 @@ AggregationOperationState::AggregationOperationState( handles_.emplace_back((*agg_func_it)->createHandle(argument_types)); if (!group_by_list_.empty()) { - // Aggregation with GROUP BY: combined payload is partially updated in the presence of DISTINCT. - if (*is_distinct_it) { - handles_.back()->BlockUpdate(); - } - group_by_handles.emplace_back(handles_.back()); - payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize()); + // Aggregation with GROUP BY: combined payload is partially updated in + // the presence of DISTINCT. + if (*is_distinct_it) { + handles_.back()->blockUpdate(); + } + group_by_handles.emplace_back(handles_.back()); + payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize()); } else { // Aggregation without GROUP BY: create a single global state. single_states_.emplace_back(handles_.back()->createInitialState()); @@ -146,31 +149,38 @@ AggregationOperationState::AggregationOperationState( std::vector<attribute_id> local_arguments_as_attributes; local_arguments_as_attributes.reserve(args_it->size()); for (const std::unique_ptr<const Scalar> &argument : *args_it) { - const attribute_id argument_id = argument->getAttributeIdForValueAccessor(); + const attribute_id argument_id = + argument->getAttributeIdForValueAccessor(); if (argument_id == -1) { local_arguments_as_attributes.clear(); break; } else { - DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor()); + DCHECK_EQ(input_relation_.getID(), + argument->getRelationIdForValueAccessor()); local_arguments_as_attributes.push_back(argument_id); } } - arguments_as_attributes_.emplace_back(std::move(local_arguments_as_attributes)); + arguments_as_attributes_.emplace_back( + std::move(local_arguments_as_attributes)); #endif } - // Initialize the corresponding distinctify hash table if this is a DISTINCT + // Initialize the corresponding distinctify hash table if this is a + // DISTINCT // aggregation. if (*is_distinct_it) { - std::vector<const Type*> key_types(group_by_types); - key_types.insert(key_types.end(), argument_types.begin(), argument_types.end()); - // TODO(jianqiao): estimated_num_entries is quite inaccurate for estimating + std::vector<const Type *> key_types(group_by_types); + key_types.insert( + key_types.end(), argument_types.begin(), argument_types.end()); + // TODO(jianqiao): estimated_num_entries is quite inaccurate for + // estimating // the number of entries in the distinctify hash table. We may estimate - // for each distinct aggregation an estimated_num_distinct_keys value during + // for each distinct aggregation an estimated_num_distinct_keys value + // during // query optimization, if it worths. distinctify_hashtables_.emplace_back( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateFastHashTableFactory::CreateResizable( *distinctify_hash_table_impl_types_it, key_types, estimated_num_entries, @@ -184,16 +194,17 @@ AggregationOperationState::AggregationOperationState( } if (!group_by_handles.empty()) { - // Aggregation with GROUP BY: create a HashTable pool for per-group states. + // Aggregation with GROUP BY: create a HashTable pool for per-group + // states. group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( - new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - payload_sizes, - group_by_handles, - storage_manager))); - } + new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + payload_sizes, + group_by_handles, + storage_manager))); } + } } AggregationOperationState* AggregationOperationState::ReconstructFromProto( @@ -203,7 +214,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto( DCHECK(ProtoIsValid(proto, database)); // Rebuild contructor arguments from their representation in 'proto'. - std::vector<const AggregateFunction*> aggregate_functions; + std::vector<const AggregateFunction *> aggregate_functions; std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments; std::vector<bool> is_distinct; std::vector<HashTableImplType> distinctify_hash_table_impl_types; @@ -216,62 +227,63 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto( arguments.emplace_back(); arguments.back().reserve(agg_proto.argument_size()); - for (int argument_idx = 0; argument_idx < agg_proto.argument_size(); ++argument_idx) { + for (int argument_idx = 0; argument_idx < agg_proto.argument_size(); + ++argument_idx) { arguments.back().emplace_back(ScalarFactory::ReconstructFromProto( - agg_proto.argument(argument_idx), - database)); + agg_proto.argument(argument_idx), database)); } is_distinct.emplace_back(agg_proto.is_distinct()); if (agg_proto.is_distinct()) { distinctify_hash_table_impl_types.emplace_back( - HashTableImplTypeFromProto( - proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index))); + HashTableImplTypeFromProto(proto.distinctify_hash_table_impl_types( + distinctify_hash_table_impl_type_index))); ++distinctify_hash_table_impl_type_index; } } std::vector<std::unique_ptr<const Scalar>> group_by_expressions; - for (int group_by_idx = 0; - group_by_idx < proto.group_by_expressions_size(); + for (int group_by_idx = 0; group_by_idx < proto.group_by_expressions_size(); ++group_by_idx) { group_by_expressions.emplace_back(ScalarFactory::ReconstructFromProto( - proto.group_by_expressions(group_by_idx), - database)); + proto.group_by_expressions(group_by_idx), database)); } unique_ptr<Predicate> predicate; if (proto.has_predicate()) { predicate.reset( - PredicateFactory::ReconstructFromProto(proto.predicate(), - database)); + PredicateFactory::ReconstructFromProto(proto.predicate(), database)); } - return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()), - aggregate_functions, - std::move(arguments), - std::move(is_distinct), - std::move(group_by_expressions), - predicate.release(), - proto.estimated_num_entries(), - HashTableImplTypeFromProto(proto.hash_table_impl_type()), - distinctify_hash_table_impl_types, - storage_manager); + return new AggregationOperationState( + database.getRelationSchemaById(proto.relation_id()), + aggregate_functions, + std::move(arguments), + std::move(is_distinct), + std::move(group_by_expressions), + predicate.release(), + proto.estimated_num_entries(), + HashTableImplTypeFromProto(proto.hash_table_impl_type()), + distinctify_hash_table_impl_types, + storage_manager); } -bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOperationState &proto, - const CatalogDatabaseLite &database) { +bool AggregationOperationState::ProtoIsValid( + const serialization::AggregationOperationState &proto, + const CatalogDatabaseLite &database) { if (!proto.IsInitialized() || !database.hasRelationWithId(proto.relation_id()) || (proto.aggregates_size() < 0)) { return false; } - std::size_t num_distinctify_hash_tables = proto.distinctify_hash_table_impl_types_size(); + std::size_t num_distinctify_hash_tables = + proto.distinctify_hash_table_impl_types_size(); std::size_t distinctify_hash_table_impl_type_index = 0; for (int i = 0; i < proto.aggregates_size(); ++i) { - if (!AggregateFunctionFactory::ProtoIsValid(proto.aggregates(i).function())) { + if (!AggregateFunctionFactory::ProtoIsValid( + proto.aggregates(i).function())) { return false; } @@ -282,16 +294,18 @@ bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOpe for (int argument_idx = 0; argument_idx < proto.aggregates(i).argument_size(); ++argument_idx) { - if (!ScalarFactory::ProtoIsValid(proto.aggregates(i).argument(argument_idx), - database)) { + if (!ScalarFactory::ProtoIsValid( + proto.aggregates(i).argument(argument_idx), database)) { return false; } } if (proto.aggregates(i).is_distinct()) { - if (distinctify_hash_table_impl_type_index >= num_distinctify_hash_tables || + if (distinctify_hash_table_impl_type_index >= + num_distinctify_hash_tables || !serialization::HashTableImplType_IsValid( - proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index))) { + proto.distinctify_hash_table_impl_types( + distinctify_hash_table_impl_type_index))) { return false; } } @@ -304,8 +318,9 @@ bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOpe } if (proto.group_by_expressions_size() > 0) { - if (!proto.has_hash_table_impl_type() - || !serialization::HashTableImplType_IsValid(proto.hash_table_impl_type())) { + if (!proto.has_hash_table_impl_type() || + !serialization::HashTableImplType_IsValid( + proto.hash_table_impl_type())) { return false; } } @@ -327,7 +342,8 @@ void AggregationOperationState::aggregateBlock(const block_id input_block) { } } -void AggregationOperationState::finalizeAggregate(InsertDestination *output_destination) { +void AggregationOperationState::finalizeAggregate( + InsertDestination *output_destination) { if (group_by_list_.empty()) { finalizeSingleState(output_destination); } else { @@ -346,19 +362,19 @@ void AggregationOperationState::mergeSingleState( } } -void AggregationOperationState::aggregateBlockSingleState(const block_id input_block) { +void AggregationOperationState::aggregateBlockSingleState( + const block_id input_block) { // Aggregate per-block state for each aggregate. std::vector<std::unique_ptr<AggregationState>> local_state; - BlockReference block(storage_manager_->getBlock(input_block, input_relation_)); + BlockReference block( + storage_manager_->getBlock(input_block, input_relation_)); // If there is a filter predicate, 'reuse_matches' holds the set of matching // tuples so that it can be reused across multiple aggregates (i.e. we only // pay the cost of evaluating the predicate once). std::unique_ptr<TupleIdSequence> reuse_matches; - for (std::size_t agg_idx = 0; - agg_idx < handles_.size(); - ++agg_idx) { + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { const std::vector<attribute_id> *local_arguments_as_attributes = nullptr; #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION // If all arguments are attributes of the input relation, elide a copy. @@ -381,12 +397,11 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b local_state.emplace_back(nullptr); } else { // Call StorageBlock::aggregate() to actually do the aggregation. - local_state.emplace_back( - block->aggregate(*handles_[agg_idx], - arguments_[agg_idx], - local_arguments_as_attributes, - predicate_.get(), - &reuse_matches)); + local_state.emplace_back(block->aggregate(*handles_[agg_idx], + arguments_[agg_idx], + local_arguments_as_attributes, + predicate_.get(), + &reuse_matches)); } } @@ -394,8 +409,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b mergeSingleState(local_state); } -void AggregationOperationState::aggregateBlockHashTable(const block_id input_block) { - BlockReference block(storage_manager_->getBlock(input_block, input_relation_)); +void AggregationOperationState::aggregateBlockHashTable( + const block_id input_block) { + BlockReference block( + storage_manager_->getBlock(input_block, input_relation_)); // If there is a filter predicate, 'reuse_matches' holds the set of matching // tuples so that it can be reused across multiple aggregates (i.e. we only @@ -407,11 +424,10 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo // GROUP BY expressions once). std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors; - for (std::size_t agg_idx = 0; - agg_idx < handles_.size(); - ++agg_idx) { + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { - // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression + // Call StorageBlock::aggregateDistinct() to insert the GROUP BY + // expression // values and the aggregation arguments together as keys directly into the // (threadsafe) shared global distinctify HashTable for this aggregate. block->aggregateDistinct(*handles_[agg_idx], @@ -429,7 +445,8 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo // directly into the (threadsafe) shared global HashTable for this // aggregate. DCHECK(group_by_hashtable_pools_[0] != nullptr); - AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast(); + AggregationStateHashTableBase *agg_hash_table = + group_by_hashtable_pools_[0]->getHashTableFast(); DCHECK(agg_hash_table != nullptr); block->aggregateGroupByFast(arguments_, group_by_list_, @@ -440,32 +457,35 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table); } -void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) { +void AggregationOperationState::finalizeSingleState( + InsertDestination *output_destination) { // Simply build up a Tuple from the finalized values for each aggregate and // insert it in '*output_destination'. std::vector<TypedValue> attribute_values; - for (std::size_t agg_idx = 0; - agg_idx < handles_.size(); - ++agg_idx) { + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { single_states_[agg_idx].reset( - handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(*distinctify_hashtables_[agg_idx])); + handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle( + *distinctify_hashtables_[agg_idx])); } - attribute_values.emplace_back(handles_[agg_idx]->finalize(*single_states_[agg_idx])); + attribute_values.emplace_back( + handles_[agg_idx]->finalize(*single_states_[agg_idx])); } output_destination->insertTuple(Tuple(std::move(attribute_values))); } -void AggregationOperationState::mergeGroupByHashTables(AggregationStateHashTableBase *src, - AggregationStateHashTableBase *dst) { - HashTableMergerFast merger(dst); - (static_cast<FastHashTable<true, false, true, false> *>(src))->forEachCompositeKeyFast(&merger); +void AggregationOperationState::mergeGroupByHashTables( + AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) { + HashTableMergerFast merger(dst); + (static_cast<FastHashTable<true, false, true, false> *>(src)) + ->forEachCompositeKeyFast(&merger); } -void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) { +void AggregationOperationState::finalizeHashTable( + InsertDestination *output_destination) { // Each element of 'group_by_keys' is a vector of values for a particular // group (which is also the prefix of the finalized Tuple for that group). std::vector<std::vector<TypedValue>> group_by_keys; @@ -483,17 +503,14 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest hash_table_index < static_cast<int>(hash_tables->size() - 1); ++hash_table_index) { // Merge each hash table to the last hash table. - mergeGroupByHashTables( - (*hash_tables)[hash_table_index].get(), - hash_tables->back().get()); + mergeGroupByHashTables((*hash_tables)[hash_table_index].get(), + hash_tables->back().get()); } } // Collect per-aggregate finalized values. std::vector<std::unique_ptr<ColumnVector>> final_values; - for (std::size_t agg_idx = 0; - agg_idx < handles_.size(); - ++agg_idx) { + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { DCHECK(group_by_hashtable_pools_[0] != nullptr); auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); @@ -502,18 +519,17 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest // We may have a case where hash_tables is empty, e.g. no input blocks. // However for aggregateOnDistinctifyHashTableForGroupBy to work // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[0]->getHashTableFast(); + AggregationStateHashTableBase *new_hash_table = + group_by_hashtable_pools_[0]->getHashTableFast(); group_by_hashtable_pools_[0]->returnHashTable(new_hash_table); hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); } DCHECK(hash_tables->back() != nullptr); AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); DCHECK(agg_hash_table != nullptr); - handles_[agg_idx]->AllowUpdate(); + handles_[agg_idx]->allowUpdate(); handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy( - *distinctify_hashtables_[agg_idx], - agg_hash_table, - agg_idx); + *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx); } auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); @@ -522,16 +538,15 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest // We may have a case where hash_tables is empty, e.g. no input blocks. // However for aggregateOnDistinctifyHashTableForGroupBy to work // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[0]->getHashTable(); + AggregationStateHashTableBase *new_hash_table = + group_by_hashtable_pools_[0]->getHashTable(); group_by_hashtable_pools_[0]->returnHashTable(new_hash_table); hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); } AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); DCHECK(agg_hash_table != nullptr); - ColumnVector* agg_result_col = - handles_[agg_idx]->finalizeHashTable(*agg_hash_table, - &group_by_keys, - agg_idx); + ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( + *agg_hash_table, &group_by_keys, agg_idx); if (agg_result_col != nullptr) { final_values.emplace_back(agg_result_col); } @@ -549,16 +564,20 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) { const Type &group_by_type = group_by_element->getType(); if (NativeColumnVector::UsableForType(group_by_type)) { - NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size()); + NativeColumnVector *element_cv = + new NativeColumnVector(group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue( + std::move(group_key[group_by_element_idx])); } } else { - IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size()); + IndirectColumnVector *element_cv = + new IndirectColumnVector(group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue( + std::move(group_key[group_by_element_idx])); } } ++group_by_element_idx; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c123bd49/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index d408c22..7956bc6 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -102,16 +102,17 @@ class AggregationOperationState { * tables. Single aggregation state (when GROUP BY list is not * specified) is not allocated using memory from storage manager. */ - AggregationOperationState(const CatalogRelationSchema &input_relation, - const std::vector<const AggregateFunction*> &aggregate_functions, - std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments, - std::vector<bool> &&is_distinct, - std::vector<std::unique_ptr<const Scalar>> &&group_by, - const Predicate *predicate, - const std::size_t estimated_num_entries, - const HashTableImplType hash_table_impl_type, - const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, - StorageManager *storage_manager); + AggregationOperationState( + const CatalogRelationSchema &input_relation, + const std::vector<const AggregateFunction *> &aggregate_functions, + std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments, + std::vector<bool> &&is_distinct, + std::vector<std::unique_ptr<const Scalar>> &&group_by, + const Predicate *predicate, + const std::size_t estimated_num_entries, + const HashTableImplType hash_table_impl_type, + const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, + StorageManager *storage_manager); ~AggregationOperationState() {} @@ -143,8 +144,9 @@ class AggregationOperationState { * in. * @return Whether proto is fully-formed and valid. **/ - static bool ProtoIsValid(const serialization::AggregationOperationState &proto, - const CatalogDatabaseLite &database); + static bool ProtoIsValid( + const serialization::AggregationOperationState &proto, + const CatalogDatabaseLite &database); /** * @brief Compute aggregates on the tuples of the given storage block, @@ -165,12 +167,16 @@ class AggregationOperationState { **/ void finalizeAggregate(InsertDestination *output_destination); + static void mergeGroupByHashTables(AggregationStateHashTableBase *src, + AggregationStateHashTableBase *dst); + int dflag; private: // Merge locally (per storage block) aggregated states with global aggregation // states. - void mergeSingleState(const std::vector<std::unique_ptr<AggregationState>> &local_state); + void mergeSingleState( + const std::vector<std::unique_ptr<AggregationState>> &local_state); // Aggregate on input block. void aggregateBlockSingleState(const block_id input_block); @@ -187,7 +193,7 @@ class AggregationOperationState { // Each individual aggregate in this operation has an AggregationHandle and // some number of Scalar arguments. -// std::vector<std::unique_ptr<AggregationHandle>> handles_; + // std::vector<std::unique_ptr<AggregationHandle>> handles_; std::vector<AggregationHandle *> handles_; std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_; @@ -196,7 +202,8 @@ class AggregationOperationState { std::vector<bool> is_distinct_; // Hash table for obtaining distinct (i.e. unique) arguments. - std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_; + std::vector<std::unique_ptr<AggregationStateHashTableBase>> + distinctify_hashtables_; #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION // If all an aggregate's argument expressions are simply attributes in @@ -211,15 +218,14 @@ class AggregationOperationState { // // TODO(shoban): We should ideally store the aggregation state together in one // hash table to prevent multiple lookups. - std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_; + std::vector<std::unique_ptr<AggregationStateHashTableBase>> + group_by_hashtables_; // A vector of group by hash table pools, one for each group by clause. std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_; StorageManager *storage_manager_; - void mergeGroupByHashTables(AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst); - DISALLOW_COPY_AND_ASSIGN(AggregationOperationState); };