Repository: incubator-quickstep Updated Branches: refs/heads/quickstep-28-29 95a46bbe6 -> ac3512ceb (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/HashTablePool.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp index 53fe514..3cdfcb3 100644 --- a/storage/HashTablePool.hpp +++ b/storage/HashTablePool.hpp @@ -27,6 +27,8 @@ #include "expressions/aggregation/AggregationHandle.hpp" #include "storage/HashTableBase.hpp" +#include "storage/FastHashTable.hpp" +#include "storage/FastHashTableFactory.hpp" #include "threading/SpinMutex.hpp" #include "utility/Macros.hpp" #include "utility/StringUtil.hpp" @@ -82,6 +84,34 @@ class HashTablePool { storage_manager_(DCHECK_NOTNULL(storage_manager)) {} /** + * @brief Constructor. + * + * @note This constructor is relevant for HashTables specialized for + * aggregation. + * + * @param estimated_num_entries The maximum number of entries in a hash table. + * @param hash_table_impl_type The type of hash table implementation. + * @param group_by_types A vector of pointer of types which form the group by + * key. + * @param payload_sizes The sizes in bytes for the AggregationStates for the + * respective AggregationHandles. + * @param handles The AggregationHandles in this query. + * @param storage_manager A pointer to the storage manager. + **/ + HashTablePool(const std::size_t estimated_num_entries, + const HashTableImplType hash_table_impl_type, + const std::vector<const Type *> &group_by_types, + const std::vector<std::size_t> &payload_sizes, + const std::vector<AggregationHandle *> &handles, + StorageManager *storage_manager) + : estimated_num_entries_(reduceEstimatedCardinality(estimated_num_entries)), + hash_table_impl_type_(hash_table_impl_type), + group_by_types_(group_by_types), + payload_sizes_(payload_sizes), + handles_(handles), + storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + + /** * @brief Check out a hash table for insertion. * * @return A hash table pointer. @@ -101,6 +131,28 @@ class HashTablePool { } /** + * @brief Check out a hash table for insertion. + * + * @note This method is relevant for specialized (for aggregation) + * hash table implementation. + * + * @return A hash table pointer. + **/ + AggregationStateHashTableBase* getHashTableFast() { + { + SpinMutexLock lock(mutex_); + if (!hash_tables_.empty()) { + std::unique_ptr<AggregationStateHashTableBase> ret_hash_table( + std::move(hash_tables_.back())); + hash_tables_.pop_back(); + DCHECK(ret_hash_table != nullptr); + return ret_hash_table.release(); + } + } + return createNewHashTableFast(); + } + + /** * @brief Return a previously checked out hash table. * * @param hash_table A pointer to the checked out hash table. @@ -134,6 +186,16 @@ class HashTablePool { storage_manager_); } + AggregationStateHashTableBase* createNewHashTableFast() { + return AggregationStateFastHashTableFactory::CreateResizable( + hash_table_impl_type_, + group_by_types_, + estimated_num_entries_, + payload_sizes_, + handles_, + storage_manager_); + } + inline std::size_t reduceEstimatedCardinality( const std::size_t original_estimate) const { if (original_estimate < kEstimateReductionFactor) { @@ -153,7 +215,10 @@ class HashTablePool { const std::vector<const Type *> group_by_types_; + std::vector<std::size_t> payload_sizes_; + AggregationHandle *agg_handle_; + const std::vector<AggregationHandle *> handles_; StorageManager *storage_manager_; SpinMutex mutex_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/StorageBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp index 21aa12c..ec5990f 100644 --- a/storage/StorageBlock.cpp +++ b/storage/StorageBlock.cpp @@ -415,8 +415,7 @@ AggregationState* StorageBlock::aggregate( } void StorageBlock::aggregateGroupBy( - const AggregationHandle &handle, - const std::vector<std::unique_ptr<const Scalar>> &arguments, + const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments, const std::vector<std::unique_ptr<const Scalar>> &group_by, const Predicate *predicate, AggregationStateHashTableBase *hash_table, @@ -481,19 +480,24 @@ void StorageBlock::aggregateGroupBy( } // Compute argument vectors and add them to 'temp_result'. - for (const std::unique_ptr<const Scalar> &argument : arguments) { - temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref)); - argument_ids.push_back(attr_id++); - } + for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) { + for (const std::unique_ptr<const Scalar> &args : argument) { + temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref)); + argument_ids.push_back(attr_id++); + } + if (argument.empty()) { + argument_ids.push_back(kInvalidAttributeID); + } + } } - // Actually do aggregation into '*hash_table'. - handle.aggregateValueAccessorIntoHashTable(&temp_result, - argument_ids, - key_ids, - hash_table); + hash_table->upsertValueAccessorCompositeKeyFast(argument_ids, + &temp_result, + key_ids, + true); } + void StorageBlock::aggregateDistinct( const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, @@ -582,7 +586,6 @@ void StorageBlock::aggregateDistinct( &temp_result, key_ids, distinctify_hash_table); } - // TODO(chasseur): Vectorization for updates. StorageBlock::UpdateResult StorageBlock::update( const unordered_map<attribute_id, unique_ptr<const Scalar>> &assignments, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/StorageBlock.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp index 97b4773..bab5bab 100644 --- a/storage/StorageBlock.hpp +++ b/storage/StorageBlock.hpp @@ -419,7 +419,6 @@ class StorageBlock : public StorageBlockBase { * @brief Perform GROUP BY aggregation on the tuples in the this storage * block. * - * @param handle Aggregation handle to compute aggregates with. * @param arguments The arguments to the aggregation function as Scalars. * @param group_by The list of GROUP BY attributes/expressions. The tuples in * this storage block are grouped by these attributes before @@ -459,14 +458,13 @@ class StorageBlock : public StorageBlockBase { * attributes as std::vector<attribute_id> (like in selectSimple()) for fast * path when there are no expressions specified in the query. */ - void aggregateGroupBy(const AggregationHandle &handle, - const std::vector<std::unique_ptr<const Scalar>> &arguments, - const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, - AggregationStateHashTableBase *hash_table, - std::unique_ptr<TupleIdSequence> *reuse_matches, - std::vector<std::unique_ptr<ColumnVector>> - *reuse_group_by_vectors) const; + void aggregateGroupBy( + const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments, + const std::vector<std::unique_ptr<const Scalar>> &group_by, + const Predicate *predicate, + AggregationStateHashTableBase *hash_table, + std::unique_ptr<TupleIdSequence> *reuse_matches, + std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const; /** * @brief Inserts the GROUP BY expressions and aggregation arguments together