Added function to aggregate group by with partition - Added a function to compute a composite hash function for multiple attributes. - Changes to Tuple class to support partitioning. - A function in StorageBlock to compute aggregate with group by that supports partitioning.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b020f203 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b020f203 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b020f203 Branch: refs/heads/partitioned-aggregation Commit: b020f2031fd5f3f0f320dbe2c630cdfb59c607a0 Parents: 2eb8c9d Author: Harshad Deshmukh <hbdeshm...@apache.org> Authored: Thu Aug 18 12:30:18 2016 -0500 Committer: Harshad Deshmukh <hbdeshm...@apache.org> Committed: Fri Sep 16 13:23:39 2016 -0500 ---------------------------------------------------------------------- storage/AggregationOperationState.cpp | 69 ++++++++++++------ storage/AggregationOperationState.hpp | 7 ++ storage/PartitionedHashTablePool.hpp | 4 + storage/StorageBlock.cpp | 113 +++++++++++++++++++++++++++++ storage/StorageBlock.hpp | 9 +++ types/containers/CMakeLists.txt | 1 + types/containers/Tuple.hpp | 8 ++ utility/CMakeLists.txt | 6 ++ utility/CompositeHash.hpp | 52 +++++++++++++ 9 files changed, 245 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b020f203/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index e50d133..c39e98a 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -68,7 +68,8 @@ AggregationOperationState::AggregationOperationState( const HashTableImplType hash_table_impl_type, const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, StorageManager *storage_manager) - : input_relation_(input_relation), + : is_aggregate_partitioned_(estimated_num_entries > kPartitionedAggregateThreshold), + input_relation_(input_relation), predicate_(predicate), group_by_list_(std::move(group_by)), arguments_(std::move(arguments)), @@ -194,15 +195,26 @@ AggregationOperationState::AggregationOperationState( } if (!group_by_handles.empty()) { - // Aggregation with GROUP BY: create a HashTable pool for per-group - // states. - group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( - new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - payload_sizes, - group_by_handles, - storage_manager))); + // Aggregation with GROUP BY: create a HashTable pool for per-group states. + if (!is_aggregate_partitioned_) { + group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( + new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + payload_sizes, + group_by_handles, + storage_manager))); + } + else { + partitioned_group_by_hashtable_pool_.reset( + new PartitionedHashTablePool(estimated_num_entries, + kNumPartitionsForAggregate, + hash_table_impl_type, + group_by_types, + payload_sizes, + group_by_handles, + storage_manager)); + } } } } @@ -441,20 +453,29 @@ void AggregationOperationState::aggregateBlockHashTable( } } - // Call StorageBlock::aggregateGroupBy() to aggregate this block's values - // directly into the (threadsafe) shared global HashTable for this - // aggregate. - DCHECK(group_by_hashtable_pools_[0] != nullptr); - AggregationStateHashTableBase *agg_hash_table = - group_by_hashtable_pools_[0]->getHashTableFast(); - DCHECK(agg_hash_table != nullptr); - block->aggregateGroupBy(arguments_, - group_by_list_, - predicate_.get(), - agg_hash_table, - &reuse_matches, - &reuse_group_by_vectors); - group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table); + if (!is_aggregate_partitioned_) { + // Call StorageBlock::aggregateGroupBy() to aggregate this block's values + // directly into the (threadsafe) shared global HashTable for this + // aggregate. + DCHECK(group_by_hashtable_pools_[0] != nullptr); + AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast(); + DCHECK(agg_hash_table != nullptr); + block->aggregateGroupByFast(arguments_, + group_by_list_, + predicate_.get(), + agg_hash_table, + &reuse_matches, + &reuse_group_by_vectors); + group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table); + } else { + block->aggregateGroupByPartitioned( + arguments_, + group_by_list_, + predicate_.get(), + &reuse_matches, + &reuse_group_by_vectors, + partitioned_group_by_hashtable_pool_.get()); + } } void AggregationOperationState::finalizeSingleState( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b020f203/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 66af517..7e8acb5 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -174,6 +174,11 @@ class AggregationOperationState { int dflag; private: + static constexpr std::size_t kPartitionedAggregateThreshold = 100; + static constexpr std::size_t kNumPartitionsForAggregate = 40; + + const bool is_aggregate_partitioned_; + // Merge locally (per storage block) aggregated states with global aggregation // states. void mergeSingleState( @@ -225,6 +230,8 @@ class AggregationOperationState { // A vector of group by hash table pools, one for each group by clause. std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_; + std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_; + StorageManager *storage_manager_; DISALLOW_COPY_AND_ASSIGN(AggregationOperationState); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b020f203/storage/PartitionedHashTablePool.hpp ---------------------------------------------------------------------- diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp index a71af44..7f58fa9 100644 --- a/storage/PartitionedHashTablePool.hpp +++ b/storage/PartitionedHashTablePool.hpp @@ -143,6 +143,10 @@ class PartitionedHashTablePool { return &hash_tables_; } + inline std::size_t getNumPartitions() const { + return num_partitions_; + } + private: void initializeAllHashTables() { for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b020f203/storage/StorageBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp index ec5990f..bb43630 100644 --- a/storage/StorageBlock.cpp +++ b/storage/StorageBlock.cpp @@ -41,6 +41,7 @@ #include "storage/IndexSubBlock.hpp" #include "storage/InsertDestinationInterface.hpp" #include "storage/PackedRowStoreTupleStorageSubBlock.hpp" +#include "storage/PartitionedHashTablePool.hpp" #include "storage/SMAIndexSubBlock.hpp" #include "storage/SplitRowStoreTupleStorageSubBlock.hpp" #include "storage/StorageBlockBase.hpp" @@ -1369,4 +1370,116 @@ const std::size_t StorageBlock::getNumTuples() const { return tuple_store_->numTuples(); } +void StorageBlock::aggregateGroupByPartitioned( + const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments, + const std::vector<std::unique_ptr<const Scalar>> &group_by, + const Predicate *predicate, + std::unique_ptr<TupleIdSequence> *reuse_matches, + std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors, + PartitionedHashTablePool *hashtable_pool) const { + DCHECK_EQ(group_by.size(), 0u) + << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions"; + + SubBlocksReference sub_blocks_ref(*tuple_store_, + indices_, + indices_consistent_); + + // IDs of 'arguments' as attributes in the ValueAccessor we create below. + std::vector<attribute_id> arg_ids; + std::vector<std::vector<attribute_id>> argument_ids; + + // IDs of GROUP BY key element(s) in the ValueAccessor we create below. + std::vector<attribute_id> key_ids; + + // An intermediate ValueAccessor that stores the materialized 'arguments' for + // this aggregate, as well as the GROUP BY expression values. + ColumnVectorsValueAccessor temp_result; + std::unique_ptr<ValueAccessor> accessor; + if (predicate) { + if (!*reuse_matches) { + // If there is a filter predicate that hasn't already been evaluated, + // evaluate it now and save the results for other aggregates on this + // same block. + reuse_matches->reset(getMatchesForPredicate(predicate)); + } + + // Create a filtered ValueAccessor that only iterates over predicate + // matches. + accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); + } else { + // Create a ValueAccessor that iterates over all tuples in this block + accessor.reset(tuple_store_->createValueAccessor()); + } + + attribute_id attr_id = 0; + + // First, put GROUP BY keys into 'temp_result'. + if (reuse_group_by_vectors->empty()) { + // Compute GROUP BY values from group_by Scalars, and store them in + // reuse_group_by_vectors for reuse by other aggregates on this same + // block. + reuse_group_by_vectors->reserve(group_by.size()); + for (const std::unique_ptr<const Scalar> &group_by_element : group_by) { + reuse_group_by_vectors->emplace_back( + group_by_element->getAllValues(accessor.get(), &sub_blocks_ref)); + temp_result.addColumn(reuse_group_by_vectors->back().get(), false); + key_ids.push_back(attr_id++); + } + } else { + // Reuse precomputed GROUP BY values from reuse_group_by_vectors. + DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size()) + << "Wrong number of reuse_group_by_vectors"; + for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) { + temp_result.addColumn(reuse_cv.get(), false); + key_ids.push_back(attr_id++); + } + } + + // Compute argument vectors and add them to 'temp_result'. + for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) { + arg_ids.clear(); + for (const std::unique_ptr<const Scalar> &args : argument) { + temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref)); + arg_ids.push_back(attr_id++); + } + argument_ids.push_back(arg_ids); + } + + // Compute the partitions for the tuple formed by group by values. + std::vector<std::unique_ptr<TupleIdSequence>> partition_membership; + partition_membership.resize(hashtable_pool->getNumPartitions()); + + // Create a tuple-id sequence for each partition. + for (std::size_t partition = 0; + partition < hashtable_pool->getNumPartitions(); + ++partition) { + partition_membership[partition].reset(new TupleIdSequence(temp_result.getEndPosition())); + } + + // Iterate over ValueAccessor for each tuple, + // set a bit in the appropriate TupleIdSequence. + temp_result.beginIteration(); + while (temp_result.next()) { + const std::size_t curr_tuple_partition_id = + temp_result.getTupleWithAttributes(key_ids)->getTupleHash() % + hashtable_pool->getNumPartitions(); + partition_membership[curr_tuple_partition_id]->set( + temp_result.getCurrentPosition(), true); + } + // For each partition, create an adapter around Value Accessor and + // TupleIdSequence. + std::vector<std::unique_ptr< + TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter; + adapter.resize(hashtable_pool->getNumPartitions()); + for (std::size_t partition = 0; + partition < hashtable_pool->getNumPartitions(); + ++partition) { + adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter( + *partition_membership[partition])); + hashtable_pool->getHashTable(partition) + ->upsertValueAccessorCompositeKeyFast( + argument_ids, adapter[partition].get(), key_ids, true); + } +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b020f203/storage/StorageBlock.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp index 398008e..4be91c6 100644 --- a/storage/StorageBlock.hpp +++ b/storage/StorageBlock.hpp @@ -44,6 +44,7 @@ class AggregationState; class CatalogRelationSchema; class ColumnVector; class InsertDestinationInterface; +class PartitionedHashTablePool; class Predicate; class Scalar; class StorageBlockLayout; @@ -467,6 +468,14 @@ class StorageBlock : public StorageBlockBase { std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const; + void aggregateGroupByPartitioned( + const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments, + const std::vector<std::unique_ptr<const Scalar>> &group_by, + const Predicate *predicate, + std::unique_ptr<TupleIdSequence> *reuse_matches, + std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors, + PartitionedHashTablePool *hashtable_pool) const; + /** * @brief Inserts the GROUP BY expressions and aggregation arguments together * as keys into the distinctify hash table. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b020f203/types/containers/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/types/containers/CMakeLists.txt b/types/containers/CMakeLists.txt index aacb63a..c2a6623 100644 --- a/types/containers/CMakeLists.txt +++ b/types/containers/CMakeLists.txt @@ -49,6 +49,7 @@ target_link_libraries(quickstep_types_containers_Tuple quickstep_catalog_CatalogTypedefs quickstep_types_TypedValue quickstep_types_containers_Tuple_proto + quickstep_utility_CompositeHash quickstep_utility_Macros) target_link_libraries(quickstep_types_containers_Tuple_proto quickstep_types_TypedValue_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b020f203/types/containers/Tuple.hpp ---------------------------------------------------------------------- diff --git a/types/containers/Tuple.hpp b/types/containers/Tuple.hpp index 60f832c..6237d54 100644 --- a/types/containers/Tuple.hpp +++ b/types/containers/Tuple.hpp @@ -28,6 +28,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "types/TypedValue.hpp" #include "types/containers/Tuple.pb.h" +#include "utility/CompositeHash.hpp" #include "utility/Macros.hpp" #include "glog/logging.h" @@ -218,6 +219,13 @@ class Tuple { return attribute_values_.size(); } + /** + * @brief Get the hash value of the tuple. + **/ + std::size_t getTupleHash() const { + return HashCompositeKey(attribute_values_); + } + private: /** * @brief Constructor which does not create any attributes, nor pre-reserve http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b020f203/utility/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt index ddaae45..4fb6e5b 100644 --- a/utility/CMakeLists.txt +++ b/utility/CMakeLists.txt @@ -167,6 +167,7 @@ add_library(quickstep_utility_BloomFilter_proto add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp) add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp) add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp) +add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp) add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp) add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp) add_library(quickstep_utility_ExecutionDAGVisualizer @@ -227,6 +228,10 @@ target_link_libraries(quickstep_utility_CalculateInstalledMemory glog) target_link_libraries(quickstep_utility_CheckSnprintf glog) +target_link_libraries(quickstep_utility_CompositeHash + quickstep_types_TypedValue + quickstep_utility_HashPair + glog) target_link_libraries(quickstep_utility_DAG glog quickstep_utility_Macros) @@ -318,6 +323,7 @@ target_link_libraries(quickstep_utility quickstep_utility_CalculateInstalledMemory quickstep_utility_Cast quickstep_utility_CheckSnprintf + quickstep_utility_CompositeHash quickstep_utility_DAG quickstep_utility_EqualsAnyConstant quickstep_utility_ExecutionDAGVisualizer http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b020f203/utility/CompositeHash.hpp ---------------------------------------------------------------------- diff --git a/utility/CompositeHash.hpp b/utility/CompositeHash.hpp new file mode 100644 index 0000000..517bc96 --- /dev/null +++ b/utility/CompositeHash.hpp @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_ +#define QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_ + +#include <cstddef> +#include <vector> + +#include "types/TypedValue.hpp" +#include "utility/HashPair.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** + * @brief Compute the hash value of a composite key. + * + * @param key A vector of TypedValues which together form the composite key. + * @return The hash value. + **/ +static std::size_t HashCompositeKey(const std::vector<TypedValue> &key) { + DCHECK(!key.empty()); + std::size_t hash = key.front().getHash(); + for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1; + key_it != key.end(); + ++key_it) { + hash = CombineHashes(hash, key_it->getHash()); + } + return hash; +} + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_