ExecutionGenerator and QueryContext support for LIPFilters.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b57d551c Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b57d551c Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b57d551c Branch: refs/heads/exact-filter Commit: b57d551cd58d4388804c18da1825b691ec3b3a6e Parents: 5b24791 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Wed Sep 7 13:20:43 2016 -0500 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Thu Oct 20 14:56:08 2016 -0500 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 7 +- query_execution/QueryContext.cpp | 33 ++- query_execution/QueryContext.hpp | 96 ++++++++ query_execution/QueryContext.proto | 25 +- query_optimizer/CMakeLists.txt | 21 ++ query_optimizer/ExecutionGenerator.cpp | 36 ++- query_optimizer/ExecutionGenerator.hpp | 4 + query_optimizer/LIPFilterGenerator.cpp | 194 +++++++++++++++ query_optimizer/LIPFilterGenerator.hpp | 194 +++++++++++++++ query_optimizer/QueryPlan.hpp | 32 +++ relational_operators/RelationalOperator.hpp | 12 +- utility/lip_filter/CMakeLists.txt | 55 ++++- utility/lip_filter/LIPFilter.hpp | 64 +++++ utility/lip_filter/LIPFilter.proto | 58 +++++ utility/lip_filter/LIPFilterAdaptiveProber.hpp | 243 +++++++++++++++++++ utility/lip_filter/LIPFilterBuilder.hpp | 109 +++++++++ utility/lip_filter/LIPFilterDeployment.cpp | 87 +++++++ utility/lip_filter/LIPFilterDeployment.hpp | 111 +++++++++ utility/lip_filter/LIPFilterFactory.cpp | 74 ++++++ utility/lip_filter/LIPFilterFactory.hpp | 69 ++++++ utility/lip_filter/SingleIdentityHashFilter.hpp | 185 ++++++++++++++ 21 files changed, 1684 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index dafdea4..b5e07df 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -190,9 +190,11 @@ target_link_libraries(quickstep_queryexecution_QueryContext quickstep_types_TypedValue quickstep_types_containers_Tuple quickstep_utility_Macros - quickstep_utility_SortConfiguration) + quickstep_utility_SortConfiguration + quickstep_utility_lipfilter_LIPFilter + quickstep_utility_lipfilter_LIPFilterDeployment + quickstep_utility_lipfilter_LIPFilterFactory) target_link_libraries(quickstep_queryexecution_QueryContext_proto - quickstep_utility_BloomFilter_proto quickstep_expressions_Expressions_proto quickstep_expressions_tablegenerator_GeneratorFunction_proto quickstep_storage_AggregationOperationState_proto @@ -201,6 +203,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto quickstep_storage_WindowAggregationOperationState_proto quickstep_types_containers_Tuple_proto quickstep_utility_SortConfiguration_proto + quickstep_utility_lipfilter_LIPFilter_proto ${PROTOBUF_LIBRARY}) target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto quickstep_catalog_Catalog_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/query_execution/QueryContext.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp index 6612611..0e6636d 100644 --- a/query_execution/QueryContext.cpp +++ b/query_execution/QueryContext.cpp @@ -40,6 +40,9 @@ #include "types/TypedValue.hpp" #include "types/containers/Tuple.hpp" #include "utility/SortConfiguration.hpp" +#include "utility/lip_filter/LIPFilter.hpp" +#include "utility/lip_filter/LIPFilterDeployment.hpp" +#include "utility/lip_filter/LIPFilterFactory.hpp" #include "glog/logging.h" @@ -92,6 +95,18 @@ QueryContext::QueryContext(const serialization::QueryContext &proto, bus)); } + for (int i = 0; i < proto.lip_filters_size(); ++i) { + lip_filters_.emplace_back( + std::unique_ptr<LIPFilter>( + LIPFilterFactory::ReconstructFromProto(proto.lip_filters(i)))); + } + + for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) { + lip_deployments_.emplace_back( + std::make_unique<LIPFilterDeployment>( + proto.lip_filter_deployments(i), lip_filters_)); + } + for (int i = 0; i < proto.predicates_size(); ++i) { predicates_.emplace_back( PredicateFactory::ReconstructFromProto(proto.predicates(i), database)); @@ -151,12 +166,6 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto, } } - for (int i = 0; i < proto.bloom_filters_size(); ++i) { - if (!BloomFilter::ProtoIsValid(proto.bloom_filters(i))) { - return false; - } - } - // Each GeneratorFunctionHandle object is serialized as a function name with // a list of arguments. Here checks that the arguments are valid TypedValue's. for (int i = 0; i < proto.generator_functions_size(); ++i) { @@ -185,6 +194,18 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto, } } + for (int i = 0; i < proto.lip_filters_size(); ++i) { + if (!LIPFilterFactory::ProtoIsValid(proto.lip_filters(i))) { + return false; + } + } + + for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) { + if (!LIPFilterDeployment::ProtoIsValid(proto.lip_filter_deployments(i))) { + return false; + } + } + for (int i = 0; i < proto.predicates_size(); ++i) { if (!PredicateFactory::ProtoIsValid(proto.predicates(i), database)) { return false; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/query_execution/QueryContext.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp index 7cc44ad..1191f41 100644 --- a/query_execution/QueryContext.hpp +++ b/query_execution/QueryContext.hpp @@ -37,6 +37,8 @@ #include "types/containers/Tuple.hpp" #include "utility/Macros.hpp" #include "utility/SortConfiguration.hpp" +#include "utility/lip_filter/LIPFilter.hpp" +#include "utility/lip_filter/LIPFilterDeployment.hpp" #include "glog/logging.h" @@ -84,6 +86,17 @@ class QueryContext { typedef std::uint32_t join_hash_table_id; /** + * @brief A unique identifier for a LIPFilterDeployment per query. + **/ + typedef std::uint32_t lip_deployment_id; + static constexpr lip_deployment_id kInvalidILIPDeploymentId = static_cast<lip_deployment_id>(-1); + + /** + * @brief A unique identifier for a LIPFilter per query. + **/ + typedef std::uint32_t lip_filter_id; + + /** * @brief A unique identifier for a Predicate per query. * * @note A negative value indicates a null Predicate. @@ -295,6 +308,87 @@ class QueryContext { } /** + * @brief Whether the given LIPFilterDeployment id is valid. + * + * @param id The LIPFilterDeployment id. + * + * @return True if valid, otherwise false. + **/ + bool isValidLIPDeploymentId(const lip_deployment_id id) const { + return id < lip_deployments_.size(); + } + + /** + * @brief Get a constant pointer to the LIPFilterDeployment. + * + * @param id The LIPFilterDeployment id. + * + * @return The constant pointer to LIPFilterDeployment that is + * already created in the constructor. + **/ + inline const LIPFilterDeployment* getLIPDeployment( + const lip_deployment_id id) const { + DCHECK_LT(id, lip_deployments_.size()); + return lip_deployments_[id].get(); + } + + /** + * @brief Destory the given LIPFilterDeployment. + * + * @param id The id of the LIPFilterDeployment to destroy. + **/ + inline void destroyLIPDeployment(const lip_deployment_id id) { + DCHECK_LT(id, lip_deployments_.size()); + lip_deployments_[id].reset(); + } + + /** + * @brief Whether the given LIPFilter id is valid. + * + * @param id The LIPFilter id. + * + * @return True if valid, otherwise false. + **/ + bool isValidLIPFilterId(const lip_filter_id id) const { + return id < lip_filters_.size(); + } + + /** + * @brief Get a mutable reference to the LIPFilter. + * + * @param id The LIPFilter id. + * + * @return The LIPFilter, already created in the constructor. + **/ + inline LIPFilter* getLIPFilterMutable(const lip_filter_id id) { + DCHECK_LT(id, lip_filters_.size()); + return lip_filters_[id].get(); + } + + /** + * @brief Get a constant pointer to the LIPFilter. + * + * @param id The LIPFilter id. + * + * @return The constant pointer to LIPFilter that is + * already created in the constructor. + **/ + inline const LIPFilter* getLIPFilter(const lip_filter_id id) const { + DCHECK_LT(id, lip_filters_.size()); + return lip_filters_[id].get(); + } + + /** + * @brief Destory the given LIPFilter. + * + * @param id The id of the LIPFilter to destroy. + **/ + inline void destroyLIPFilter(const lip_filter_id id) { + DCHECK_LT(id, lip_filters_.size()); + lip_filters_[id].reset(); + } + + /** * @brief Whether the given Predicate id is valid or no predicate. * * @param id The Predicate id. @@ -472,6 +566,8 @@ class QueryContext { std::vector<std::unique_ptr<const GeneratorFunctionHandle>> generator_functions_; std::vector<std::unique_ptr<InsertDestination>> insert_destinations_; std::vector<std::unique_ptr<JoinHashTable>> join_hash_tables_; + std::vector<std::unique_ptr<LIPFilterDeployment>> lip_deployments_; + std::vector<std::unique_ptr<LIPFilter>> lip_filters_; std::vector<std::unique_ptr<const Predicate>> predicates_; std::vector<std::vector<std::unique_ptr<const Scalar>>> scalar_groups_; std::vector<std::unique_ptr<const SortConfiguration>> sort_configs_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/query_execution/QueryContext.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto index 1a586a4..ab0f520 100644 --- a/query_execution/QueryContext.proto +++ b/query_execution/QueryContext.proto @@ -26,8 +26,8 @@ import "storage/HashTable.proto"; import "storage/InsertDestination.proto"; import "storage/WindowAggregationOperationState.proto"; import "types/containers/Tuple.proto"; -import "utility/BloomFilter.proto"; import "utility/SortConfiguration.proto"; +import "utility/lip_filter/LIPFilter.proto"; message QueryContext { message ScalarGroup { @@ -46,19 +46,20 @@ message QueryContext { } repeated AggregationOperationState aggregation_states = 1; - repeated BloomFilter bloom_filters = 2; - repeated GeneratorFunctionHandle generator_functions = 3; - repeated HashTable join_hash_tables = 4; - repeated InsertDestination insert_destinations = 5; - repeated Predicate predicates = 6; - repeated ScalarGroup scalar_groups = 7; - repeated SortConfiguration sort_configs = 8; - repeated Tuple tuples = 9; + repeated GeneratorFunctionHandle generator_functions = 2; + repeated HashTable join_hash_tables = 3; + repeated InsertDestination insert_destinations = 4; + repeated LIPFilter lip_filters = 5; + repeated LIPFilterDeployment lip_filter_deployments = 6; + repeated Predicate predicates = 7; + repeated ScalarGroup scalar_groups = 8; + repeated SortConfiguration sort_configs = 9; + repeated Tuple tuples = 10; // NOTE(zuyu): For UpdateWorkOrder only. - repeated UpdateGroup update_groups = 10; + repeated UpdateGroup update_groups = 11; - repeated WindowAggregationOperationState window_aggregation_states = 11; + repeated WindowAggregationOperationState window_aggregation_states = 12; - required uint64 query_id = 12; + required uint64 query_id = 13; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index 8333d4b..00d5163 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -41,6 +41,7 @@ add_subdirectory(tests) # Declare micro-libs: add_library(quickstep_queryoptimizer_ExecutionGenerator ExecutionGenerator.cpp ExecutionGenerator.hpp) +add_library(quickstep_queryoptimizer_LIPFilterGenerator LIPFilterGenerator.cpp LIPFilterGenerator.hpp) add_library(quickstep_queryoptimizer_LogicalGenerator LogicalGenerator.cpp LogicalGenerator.hpp) add_library(quickstep_queryoptimizer_LogicalToPhysicalMapper ../empty_src.cpp @@ -72,6 +73,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_expressions_windowaggregation_WindowAggregateFunction_proto quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryContext_proto + quickstep_queryoptimizer_LIPFilterGenerator quickstep_queryoptimizer_OptimizerContext quickstep_queryoptimizer_QueryHandle quickstep_queryoptimizer_QueryPlan @@ -99,6 +101,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_queryoptimizer_physical_HashJoin quickstep_queryoptimizer_physical_InsertSelection quickstep_queryoptimizer_physical_InsertTuple + quickstep_queryoptimizer_physical_LIPFilterConfiguration quickstep_queryoptimizer_physical_NestedLoopsJoin quickstep_queryoptimizer_physical_PatternMatcher quickstep_queryoptimizer_physical_Physical @@ -152,6 +155,23 @@ if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_catalog_Catalog_proto) endif() +target_link_libraries(quickstep_queryoptimizer_LIPFilterGenerator + glog + quickstep_catalog_CatalogAttribute + quickstep_queryexecution_QueryContext + quickstep_queryexecution_QueryContext_proto + quickstep_queryoptimizer_QueryPlan + quickstep_queryoptimizer_expressions_ExprId + quickstep_queryoptimizer_physical_Aggregate + quickstep_queryoptimizer_physical_HashJoin + quickstep_queryoptimizer_physical_LIPFilterConfiguration + quickstep_queryoptimizer_physical_Physical + quickstep_queryoptimizer_physical_Selection + quickstep_relationaloperators_RelationalOperator + quickstep_types_Type + quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilter + quickstep_utility_lipfilter_LIPFilter_proto) target_link_libraries(quickstep_queryoptimizer_LogicalGenerator glog quickstep_parser_ParseStatement @@ -225,6 +245,7 @@ target_link_libraries(quickstep_queryoptimizer_Validator add_library(quickstep_queryoptimizer ../empty_src.cpp QueryOptimizerModule.hpp) target_link_libraries(quickstep_queryoptimizer quickstep_queryoptimizer_ExecutionGenerator + quickstep_queryoptimizer_LIPFilterGenerator quickstep_queryoptimizer_LogicalGenerator quickstep_queryoptimizer_LogicalToPhysicalMapper quickstep_queryoptimizer_Optimizer http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 5a701b7..2e0d8f3 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -54,6 +54,7 @@ #include "expressions/window_aggregation/WindowAggregateFunction.pb.h" #include "query_execution/QueryContext.hpp" #include "query_execution/QueryContext.pb.h" +#include "query_optimizer/LIPFilterGenerator.hpp" #include "query_optimizer/OptimizerContext.hpp" #include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryPlan.hpp" @@ -76,6 +77,7 @@ #include "query_optimizer/physical/HashJoin.hpp" #include "query_optimizer/physical/InsertSelection.hpp" #include "query_optimizer/physical/InsertTuple.hpp" +#include "query_optimizer/physical/LIPFilterConfiguration.hpp" #include "query_optimizer/physical/NestedLoopsJoin.hpp" #include "query_optimizer/physical/PatternMatcher.hpp" #include "query_optimizer/physical/Physical.hpp" @@ -153,9 +155,6 @@ static const volatile bool aggregate_hashtable_type_dummy DEFINE_bool(parallelize_load, true, "Parallelize loading data files."); -DEFINE_bool(optimize_joins, false, - "Enable post execution plan generation optimizations for joins."); - namespace E = ::quickstep::optimizer::expressions; namespace P = ::quickstep::optimizer::physical; namespace S = ::quickstep::serialization; @@ -171,6 +170,12 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) { cost_model_for_hash_join_.reset( new cost::SimpleCostModel(top_level_physical_plan_->shared_subplans())); + const auto &lip_filter_configuration = + top_level_physical_plan_->lip_filter_configuration(); + if (lip_filter_configuration != nullptr) { + lip_filter_generator_.reset(new LIPFilterGenerator(lip_filter_configuration)); + } + const CatalogRelation *result_relation = nullptr; try { @@ -179,6 +184,11 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) { } generatePlanInternal(top_level_physical_plan_->plan()); + // Deploy LIPFilters if enabled. + if (lip_filter_generator_ != nullptr) { + lip_filter_generator_->deployLIPFilters(execution_plan_, query_context_proto_); + } + // Set the query result relation if the input plan exists in physical_to_execution_map_, // which indicates the plan is the result of a SELECT query. const std::unordered_map<P::PhysicalPtr, CatalogRelationInfo>::const_iterator it = @@ -235,6 +245,11 @@ void ExecutionGenerator::generatePlanInternal( generatePlanInternal(child); } + // If enabled, collect attribute substitution map for LIPFilterGenerator. + if (lip_filter_generator_ != nullptr) { + lip_filter_generator_->registerAttributeMap(physical_plan, attribute_substitution_map_); + } + switch (physical_plan->getPhysicalType()) { case P::PhysicalType::kAggregate: return convertAggregate( @@ -566,6 +581,10 @@ void ExecutionGenerator::convertSelection( std::forward_as_tuple(select_index, output_relation)); temporary_relation_info_vec_.emplace_back(select_index, output_relation); + + if (lip_filter_generator_ != nullptr) { + lip_filter_generator_->addSelectionInfo(physical_selection, select_index); + } } void ExecutionGenerator::convertSharedSubplanReference(const physical::SharedSubplanReferencePtr &physical_plan) { @@ -794,6 +813,12 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { std::forward_as_tuple(join_operator_index, output_relation)); temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation); + + if (lip_filter_generator_ != nullptr) { + lip_filter_generator_->addHashJoinInfo(physical_plan, + build_operator_index, + join_operator_index); + } } void ExecutionGenerator::convertNestedLoopsJoin( @@ -1421,6 +1446,11 @@ void ExecutionGenerator::convertAggregate( execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index, finalize_aggregation_operator_index, true); + + if (lip_filter_generator_ != nullptr) { + lip_filter_generator_->addAggregateInfo(physical_plan, + aggregation_operator_index); + } } void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/query_optimizer/ExecutionGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp index b7d8ef9..55197c9 100644 --- a/query_optimizer/ExecutionGenerator.hpp +++ b/query_optimizer/ExecutionGenerator.hpp @@ -33,6 +33,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryContext.hpp" #include "query_execution/QueryContext.pb.h" +#include "query_optimizer/LIPFilterGenerator.hpp" #include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryPlan.hpp" #include "query_optimizer/cost_model/CostModel.hpp" @@ -427,6 +428,9 @@ class ExecutionGenerator { physical::TopLevelPlanPtr top_level_physical_plan_; + // Sub-generator for deploying LIP (lookahead information passing) filters. + std::unique_ptr<LIPFilterGenerator> lip_filter_generator_; + DISALLOW_COPY_AND_ASSIGN(ExecutionGenerator); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/query_optimizer/LIPFilterGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/LIPFilterGenerator.cpp b/query_optimizer/LIPFilterGenerator.cpp new file mode 100644 index 0000000..404037e --- /dev/null +++ b/query_optimizer/LIPFilterGenerator.cpp @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "query_optimizer/LIPFilterGenerator.hpp" + +#include <map> +#include <utility> +#include <vector> + +#include "catalog/CatalogAttribute.hpp" +#include "query_execution/QueryContext.pb.h" +#include "relational_operators/RelationalOperator.hpp" +#include "types/Type.hpp" +#include "utility/lip_filter/LIPFilter.hpp" +#include "utility/lip_filter/LIPFilter.pb.h" + +#include "glog/logging.h" + +namespace quickstep { +namespace optimizer { + +namespace E = ::quickstep::optimizer::expressions; +namespace P = ::quickstep::optimizer::physical; + +void LIPFilterGenerator::registerAttributeMap( + const P::PhysicalPtr &node, + const std::unordered_map<E::ExprId, const CatalogAttribute *> &attribute_substitution_map) { + // Check if a builder is attached to node. + const auto &build_info_map = lip_filter_configuration_->getBuildInfoMap(); + const auto build_it = build_info_map.find(node); + if (build_it != build_info_map.end()) { + auto &map_entry = attribute_map_[node]; + for (const auto &info : build_it->second) { + E::ExprId attr_id = info.build_attribute->id(); + map_entry.emplace(attr_id, attribute_substitution_map.at(attr_id)); + } + } + // Check if a prober is attached to node. + const auto &probe_info_map = lip_filter_configuration_->getProbeInfoMap(); + const auto probe_it = probe_info_map.find(node); + if (probe_it != probe_info_map.end()) { + auto &map_entry = attribute_map_[node]; + for (const auto &info : probe_it->second) { + E::ExprId attr_id = info.probe_attribute->id(); + map_entry.emplace(attr_id, attribute_substitution_map.at(attr_id)); + } + } +} + +void LIPFilterGenerator::deployLIPFilters(QueryPlan *execution_plan, + serialization::QueryContext *query_context_proto) const { + LIPFilterBuilderMap lip_filter_builder_map; + + // Deploy builders + const auto &build_info_map = lip_filter_configuration_->getBuildInfoMap(); + for (const auto &info : builder_infos_) { + const auto build_it = build_info_map.find(info.builder_node); + if (build_it != build_info_map.end()) { + deployBuilderInternal(execution_plan, + query_context_proto, + info.builder_node, + info.builder_operator_index, + build_it->second, + &lip_filter_builder_map); + } + } + + // Deploy probers + const auto &probe_info_map = lip_filter_configuration_->getProbeInfoMap(); + for (const auto &info : prober_infos_) { + const auto probe_it = probe_info_map.find(info.prober_node); + if (probe_it != probe_info_map.end()) { + deployProberInteral(execution_plan, + query_context_proto, + info.prober_node, + info.prober_operator_index, + probe_it->second, + lip_filter_builder_map); + } + } +} + +void LIPFilterGenerator::deployBuilderInternal( + QueryPlan *execution_plan, + serialization::QueryContext *query_context_proto, + const physical::PhysicalPtr &builder_node, + const QueryPlan::DAGNodeIndex builder_operator_index, + const std::vector<physical::LIPFilterBuildInfo> &build_info_vec, + LIPFilterBuilderMap *lip_filter_builder_map) const { + const auto lip_deployment_index = query_context_proto->lip_filter_deployments_size(); + auto *lip_filter_deployment_info_proto = + query_context_proto->add_lip_filter_deployments(); + lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::BUILD); + + const auto &builder_attribute_map = attribute_map_.at(builder_node); + for (const auto &info : build_info_vec) { + // Add the LIPFilter information into query context. + const QueryContext::lip_filter_id lip_filter_id = query_context_proto->lip_filters_size(); + serialization::LIPFilter *lip_filter_proto = query_context_proto->add_lip_filters(); + const CatalogAttribute *target_attr = builder_attribute_map.at(info.build_attribute->id()); + const Type &attr_type = target_attr->getType(); + + switch (info.filter_type) { + case LIPFilterType::kSingleIdentityHashFilter: { + DCHECK(!attr_type.isVariableLength()); + lip_filter_proto->set_lip_filter_type( + serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER); + lip_filter_proto->SetExtension( + serialization::SingleIdentityHashFilter::filter_cardinality, info.filter_cardinality); + lip_filter_proto->SetExtension( + serialization::SingleIdentityHashFilter::attribute_size, attr_type.minimumByteLength()); + break; + } + default: + LOG(FATAL) << "Unsupported LIPFilter type"; + break; + } + + // Register the builder information which is needed later by the probers. + lip_filter_builder_map->emplace( + std::make_pair(info.build_attribute->id(), builder_node), + std::make_pair(lip_filter_id, builder_operator_index)); + + // Add the builder deployment information into query context. + auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_entries(); + lip_filter_entry_proto->set_lip_filter_id(lip_filter_id); + lip_filter_entry_proto->set_attribute_id(target_attr->getID()); + lip_filter_entry_proto->mutable_attribute_type()->CopyFrom(attr_type.getProto()); + } + + // Attach the LIPFilterDeployment information to the RelationalOperator. + RelationalOperator *relop = + execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(builder_operator_index); + relop->deployLIPFilters(lip_deployment_index); +} + +void LIPFilterGenerator::deployProberInteral( + QueryPlan *execution_plan, + serialization::QueryContext *query_context_proto, + const physical::PhysicalPtr &prober_node, + const QueryPlan::DAGNodeIndex prober_operator_index, + const std::vector<physical::LIPFilterProbeInfo> &probe_info_vec, + const LIPFilterBuilderMap &lip_filter_builder_map) const { + const auto lip_deployment_index = query_context_proto->lip_filter_deployments_size(); + auto *lip_filter_deployment_info_proto = + query_context_proto->add_lip_filter_deployments(); + lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::PROBE); + + const auto &prober_attribute_map = attribute_map_.at(prober_node); + for (const auto &info : probe_info_vec) { + // Find the corresponding builder for the to-be-probed LIPFilter. + const auto &builder_info = + lip_filter_builder_map.at( + std::make_pair(info.build_attribute->id(), info.builder)); + const CatalogAttribute *target_attr = prober_attribute_map.at(info.probe_attribute->id()); + + // Add the prober deployment information into query context. + auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_entries(); + lip_filter_entry_proto->set_lip_filter_id(builder_info.first); + lip_filter_entry_proto->set_attribute_id(target_attr->getID()); + lip_filter_entry_proto->mutable_attribute_type()->CopyFrom( + target_attr->getType().getProto()); + + // A prober must wait until the corresponding builder has completed building + // the LIPFilter. + execution_plan->addOrUpgradeDirectDependency(prober_operator_index, + builder_info.second, + true /* is_pipeline_breaker */); + } + + // Attach the LIPFilterDeployment information to the RelationalOperator. + RelationalOperator *relop = + execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(prober_operator_index); + relop->deployLIPFilters(lip_deployment_index); +} + +} // namespace optimizer +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/query_optimizer/LIPFilterGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/LIPFilterGenerator.hpp b/query_optimizer/LIPFilterGenerator.hpp new file mode 100644 index 0000000..9d191a1 --- /dev/null +++ b/query_optimizer/LIPFilterGenerator.hpp @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_QUERY_OPTIMIZER_LIP_FILTER_GENERATOR_HPP_ +#define QUICKSTEP_QUERY_OPTIMIZER_LIP_FILTER_GENERATOR_HPP_ + +#include <map> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "query_execution/QueryContext.hpp" +#include "query_optimizer/QueryPlan.hpp" +#include "query_optimizer/expressions/ExprId.hpp" +#include "query_optimizer/physical/LIPFilterConfiguration.hpp" +#include "query_optimizer/physical/Aggregate.hpp" +#include "query_optimizer/physical/HashJoin.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/physical/Selection.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +namespace serialization { class QueryContext; } + +class CatalogAttribute; + +namespace optimizer { + +/** \addtogroup QueryOptimizer + * @{ + */ + +/** + * @brief Generates backend LIPFilter deployments from physical plan's information. + */ +class LIPFilterGenerator { + public: + /** + * @brief Constructor. + * + * @param lip_filter_configuration The LIPFilter configuration information + * generated by physical optimizer. + */ + explicit LIPFilterGenerator( + const physical::LIPFilterConfigurationPtr &lip_filter_configuration) + : lip_filter_configuration_(lip_filter_configuration) { + DCHECK(lip_filter_configuration_ != nullptr); + } + + /** + * @brief Collect the ExprId to CatalogAttribute mapping information for the + * given physical node. + * + * @param node A physical plan node. + * @param attribute_substitution_map A map that maps each ExprId to the + * backend relation's CatalogAttribute's. + */ + void registerAttributeMap( + const physical::PhysicalPtr &node, + const std::unordered_map<expressions::ExprId, const CatalogAttribute *> &attribute_substitution_map); + + /** + * @brief Add physical-to-execution mapping information for deploying LIPFilters + * to an aggregation. + * + * @param aggregate A physical Aggregate node. + * @param aggregate_operator_index The index of the AggregationOperator that + * corresponds to \p aggregate in the execution plan. + */ + void addAggregateInfo(const physical::AggregatePtr &aggregate, + const QueryPlan::DAGNodeIndex aggregate_operator_index) { + prober_infos_.emplace_back(aggregate, aggregate_operator_index); + } + + /** + * @brief Add physical-to-execution mapping information for deploying LIPFilters + * to a hash-join. + * + * @param hash_join A physical HashJoin node. + * @param build_operator_index The index of the BuildHashOperator that corresponds + * to \p hash_join in the execution plan. + * @param join_operator_index The index of the HashJoinOperator that corresponds + * to \p hash_join in the execution plan. + */ + void addHashJoinInfo(const physical::HashJoinPtr &hash_join, + const QueryPlan::DAGNodeIndex build_operator_index, + const QueryPlan::DAGNodeIndex join_operator_index) { + builder_infos_.emplace_back(hash_join, build_operator_index); + prober_infos_.emplace_back(hash_join, join_operator_index); + } + + /** + * @brief Add physical-to-execution mapping information for deploying LIPFilters + * to a selection. + * + * @param selection A physical Selection node. + * @param select_operator_index The index of the SelectOperator that corresponds + * to \p selection in the execution plan. + */ + void addSelectionInfo(const physical::SelectionPtr &selection, + const QueryPlan::DAGNodeIndex select_operator_index) { + prober_infos_.emplace_back(selection, select_operator_index); + } + + /** + * @brief Deploy the LIPFilters to the execution plan. + * + * @param execution_plan The execution plan. + * @param query_context_proto QueryContext protobuf for the execution plan. + */ + void deployLIPFilters(QueryPlan *execution_plan, + serialization::QueryContext *query_context_proto) const; + + private: + /** + * @brief Internal data structure for representing a LIPFilter builder. + */ + struct BuilderInfo { + BuilderInfo(const physical::PhysicalPtr &builder_node_in, + const QueryPlan::DAGNodeIndex builder_operator_index_in) + : builder_node(builder_node_in), + builder_operator_index(builder_operator_index_in) { + } + const physical::PhysicalPtr builder_node; + const QueryPlan::DAGNodeIndex builder_operator_index; + }; + + /** + * @brief Internal data structure for representing a LIPFilter prober. + */ + struct ProberInfo { + ProberInfo(const physical::PhysicalPtr &prober_node_in, + const QueryPlan::DAGNodeIndex prober_operator_index_in) + : prober_node(prober_node_in), + prober_operator_index(prober_operator_index_in) { + } + const physical::PhysicalPtr prober_node; + const QueryPlan::DAGNodeIndex prober_operator_index; + }; + + // Maps each LIPFilter's building attribute to the LIPFilter's id in QueryContext + // as well as the LIPFilter's building relational operator's index. + typedef std::map<std::pair<expressions::ExprId, physical::PhysicalPtr>, + std::pair<QueryContext::lip_filter_id, QueryPlan::DAGNodeIndex>> LIPFilterBuilderMap; + + void deployBuilderInternal(QueryPlan *execution_plan, + serialization::QueryContext *query_context_proto, + const physical::PhysicalPtr &builder_node, + const QueryPlan::DAGNodeIndex builder_operator_index, + const std::vector<physical::LIPFilterBuildInfo> &build_info_vec, + LIPFilterBuilderMap *lip_filter_builder_map) const; + + void deployProberInteral(QueryPlan *execution_plan, + serialization::QueryContext *query_context_proto, + const physical::PhysicalPtr &prober_node, + const QueryPlan::DAGNodeIndex prober_operator_index, + const std::vector<physical::LIPFilterProbeInfo> &probe_info_vec, + const LIPFilterBuilderMap &lip_filter_builder_map) const; + + const physical::LIPFilterConfigurationPtr lip_filter_configuration_; + + std::vector<BuilderInfo> builder_infos_; + std::vector<ProberInfo> prober_infos_; + + std::map<physical::PhysicalPtr, std::map<expressions::ExprId, const CatalogAttribute *>> attribute_map_; + + DISALLOW_COPY_AND_ASSIGN(LIPFilterGenerator); +}; + +/** @} */ + +} // namespace optimizer +} // namespace quickstep + +#endif /* QUICKSTEP_QUERY_OPTIMIZER_LIP_FILTER_GENERATOR_HPP_ */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/query_optimizer/QueryPlan.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/QueryPlan.hpp b/query_optimizer/QueryPlan.hpp index 5cd174c..ef6dff4 100644 --- a/query_optimizer/QueryPlan.hpp +++ b/query_optimizer/QueryPlan.hpp @@ -74,6 +74,38 @@ class QueryPlan { } /** + * @brief Creates a link or upgrades the existing link from \p producer_operator_index + * to \p consumer_operator_index in the DAG. + * + * Depending on whether there is an existing link from \p producer_operator_index + * to \p consumer_operator_index: + * - Case 1, no existing link: + * Creates a link with metadata set to is_pipeline_breaker. + * - Case 2, existing link with metadata \p m: + * Set m = (m | is_pipeline_break). + * + * @param consumer_operator_index The index of the consumer operator. + * @param producer_operator_index The index of the producer operator. + * @param is_pipeline_breaker True if the result from the producer cannot be + * pipelined to the consumer, otherwise false. + */ + inline void addOrUpgradeDirectDependency(DAGNodeIndex consumer_operator_index, + DAGNodeIndex producer_operator_index, + bool is_pipeline_breaker) { + const auto &dependents = dag_operators_.getDependents(producer_operator_index); + const auto consumer_it = dependents.find(consumer_operator_index); + if (consumer_it == dependents.end()) { + dag_operators_.createLink(producer_operator_index, + consumer_operator_index, + is_pipeline_breaker); + } else { + dag_operators_.setLinkMetadata(producer_operator_index, + consumer_operator_index, + consumer_it->second | is_pipeline_breaker); + } + } + + /** * @brief Creates dependencies for a DropTable operator with index * \p drop_operator_index. If \p producer_operator_index * has any dependent, creates a link from \p drop_operator_index http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/relational_operators/RelationalOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp index f0303e5..0a363a5 100644 --- a/relational_operators/RelationalOperator.hpp +++ b/relational_operators/RelationalOperator.hpp @@ -245,6 +245,13 @@ class RelationalOperator { return op_index_; } + /** + * @brief Deploy a group of LIPFilters to this operator. + */ + void deployLIPFilters(const QueryContext::lip_deployment_id lip_deployment_index) { + lip_deployment_index_ = lip_deployment_index; + } + protected: /** * @brief Constructor @@ -257,7 +264,8 @@ class RelationalOperator { const bool blocking_dependencies_met = false) : query_id_(query_id), blocking_dependencies_met_(blocking_dependencies_met), - done_feeding_input_relation_(false) {} + done_feeding_input_relation_(false), + lip_deployment_index_(QueryContext::kInvalidILIPDeploymentId) {} const std::size_t query_id_; @@ -265,6 +273,8 @@ class RelationalOperator { bool done_feeding_input_relation_; std::size_t op_index_; + QueryContext::lip_deployment_id lip_deployment_index_; + private: DISALLOW_COPY_AND_ASSIGN(RelationalOperator); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/utility/lip_filter/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt index 2232abe..b7224d2 100644 --- a/utility/lip_filter/CMakeLists.txt +++ b/utility/lip_filter/CMakeLists.txt @@ -15,5 +15,58 @@ # specific language governing permissions and limitations # under the License. +QS_PROTOBUF_GENERATE_CPP(utility_lipfilter_LIPFilter_proto_srcs + utility_lipfilter_LIPFilter_proto_hdrs + LIPFilter.proto) + # Declare micro-libs: -add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp) \ No newline at end of file +add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp) +add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.cpp LIPFilterAdaptiveProber.hpp) +add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp) +add_library(quickstep_utility_lipfilter_LIPFilterDeployment LIPFilterDeployment.cpp LIPFilterDeployment.hpp) +add_library(quickstep_utility_lipfilter_LIPFilterFactory LIPFilterFactory.cpp LIPFilterFactory.hpp) +add_library(quickstep_utility_lipfilter_LIPFilter_proto + ${utility_lipfilter_LIPFilter_proto_srcs}) +add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp) + +# Link dependencies: +target_link_libraries(quickstep_utility_lipfilter_LIPFilter + quickstep_catalog_CatalogTypedefs + quickstep_storage_StorageBlockInfo + quickstep_utility_Macros) +target_link_libraries(quickstep_utility_lipfilter_LIPFilterAdaptiveProber + quickstep_catalog_CatalogTypedefs + quickstep_storage_StorageBlockInfo + quickstep_storage_TupleIdSequence + quickstep_storage_ValueAccessor + quickstep_types_Type + quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilter) +target_link_libraries(quickstep_utility_lipfilter_LIPFilterBuilder + quickstep_catalog_CatalogTypedefs + quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilter) +target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeployment + quickstep_catalog_CatalogTypedefs + quickstep_types_TypeFactory + quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilter + quickstep_utility_lipfilter_LIPFilterAdaptiveProber + quickstep_utility_lipfilter_LIPFilterBuilder + quickstep_utility_lipfilter_LIPFilter_proto) +target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory + quickstep_utility_lipfilter_LIPFilter_proto + quickstep_utility_lipfilter_SingleIdentityHashFilter + quickstep_utility_Macros) +target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto + ${PROTOBUF_LIBRARY} + quickstep_types_Type_proto) +target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter + quickstep_catalog_CatalogTypedefs + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorUtil + quickstep_types_Type + quickstep_utility_lipfilter_LIPFilter + quickstep_utility_Macros) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/utility/lip_filter/LIPFilter.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp index 33165ed..682d69f 100644 --- a/utility/lip_filter/LIPFilter.hpp +++ b/utility/lip_filter/LIPFilter.hpp @@ -20,8 +20,18 @@ #ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_ #define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_ +#include <cstddef> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "utility/Macros.hpp" + namespace quickstep { +class Type; +class ValueAccessor; + /** \addtogroup Utility * @{ */ @@ -32,6 +42,60 @@ enum class LIPFilterType { kSingleIdentityHashFilter }; +/** + * @brief Base class for LIP (Lookahead Information Passing) filters. + */ +class LIPFilter { + public: + /** + * @breif Get the type of this LIPFilter. + * + * @return The type of this LIPFilter. + */ + LIPFilterType getType() const { + return type_; + } + + /** + * @brief Insert the values drawn from a ValueAccessor into this LIPFilter. + * + * @param accessor A ValueAccessor which will be used to access the values. + * @param attr_id The attribute id of the values to be read from accessor. + * @param attr_type The type of the values. + */ + virtual void insertValueAccessor(ValueAccessor *accessor, + const attribute_id attr_id, + const Type *attr_type) = 0; + + /** + * @brief Filter the given batch of tuples from a ValueAccessor. Remove any + * tuple in the batch that does not have a hit in this filter. + * + * @param accessor A ValueAccessor which will be used to access the tuples. + * @param attr_id The attribute id of the values to be filtered. + * @param is_attr_nullable Whether the values can be NULL. + * @param batch The batch of tuple ids to be filtered. This vector will also + * be updated in place in this method to hold the output tuple ids. + * @param batch_size The input batch size. + * + * @return The output batch size. + */ + virtual std::size_t filterBatch(ValueAccessor *accessor, + const attribute_id attr_id, + const bool is_attr_nullable, + std::vector<tuple_id> *batch, + const std::size_t batch_size) const = 0; + + protected: + explicit LIPFilter(const LIPFilterType &type) + : type_(type) {} + + private: + LIPFilterType type_; + + DISALLOW_COPY_AND_ASSIGN(LIPFilter); +}; + /** @} */ } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/utility/lip_filter/LIPFilter.proto ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto new file mode 100644 index 0000000..def13dd --- /dev/null +++ b/utility/lip_filter/LIPFilter.proto @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto2"; + +package quickstep.serialization; + +import "types/Type.proto"; + +enum LIPFilterType { + BLOOM_FILTER = 1; + EXACT_FILTER = 2; + SINGLE_IDENTITY_HASH_FILTER = 3; +} + +message LIPFilter { + required LIPFilterType lip_filter_type = 1; + + extensions 16 to max; +} + +message SingleIdentityHashFilter { + extend LIPFilter { + // All required + optional uint64 filter_cardinality = 16; + optional uint64 attribute_size = 17; + } +} + +enum LIPFilterActionType { + BUILD = 1; + PROBE = 2; +} + +message LIPFilterDeployment { + message Entry { + required uint32 lip_filter_id = 1; + required int32 attribute_id = 2; + required Type attribute_type = 3; + } + + required LIPFilterActionType action_type = 1; + repeated Entry entries = 2; +} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/utility/lip_filter/LIPFilterAdaptiveProber.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterAdaptiveProber.hpp b/utility/lip_filter/LIPFilterAdaptiveProber.hpp new file mode 100644 index 0000000..e1a75d6 --- /dev/null +++ b/utility/lip_filter/LIPFilterAdaptiveProber.hpp @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_ +#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_ + +#include <algorithm> +#include <cstddef> +#include <cstdint> +#include <memory> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/TupleIdSequence.hpp" +#include "storage/ValueAccessor.hpp" +#include "types/Type.hpp" +#include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilter.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** \addtogroup Utility + * @{ + */ + +/** + * @brief Helper class for adaptively applying a group of LIPFilters to a + * ValueAccessor. Here "adaptive" means that the application ordering + * of the filters will be adjusted on the fly based on the filters' miss + * rates. + */ +class LIPFilterAdaptiveProber { + public: + /** + * @brief Constructor. + * + * @param lip_filters The LIPFilters that will be probed. + * @param The target attribute ids for the LIPFilters. + * @param The target attribute types for the LIPFilters. + */ + LIPFilterAdaptiveProber(const std::vector<LIPFilter *> &lip_filters, + const std::vector<attribute_id> &attr_ids, + const std::vector<const Type *> &attr_types) { + DCHECK_EQ(lip_filters.size(), attr_ids.size()); + DCHECK_EQ(lip_filters.size(), attr_types.size()); + + probe_entries_.reserve(lip_filters.size()); + for (std::size_t i = 0; i < lip_filters.size(); ++i) { + DCHECK(lip_filters[i] != nullptr); + probe_entries_.emplace_back( + new ProbeEntry(lip_filters[i], attr_ids[i], attr_types[i])); + } + } + + /** + * @brief Destructor. + */ + ~LIPFilterAdaptiveProber() { + for (ProbeEntry *entry : probe_entries_) { + delete entry; + } + } + + /** + * @brief Apply this group of LIPFilters to the given ValueAccessor. + * + * @param accessor A ValueAccessor to be filtered. + * @return A TupleIdSequence for the hit tuples in the ValueAccessor. + */ + TupleIdSequence* filterValueAccessor(ValueAccessor *accessor) { + const TupleIdSequence *existence_map = accessor->getTupleIdSequenceVirtual(); + if (existence_map == nullptr) { + return filterValueAccessorNoExistenceMap(accessor); + } else { + return filterValueAccessorWithExistenceMap(accessor, existence_map); + } + } + + private: + /** + * @brief Internal data structure for representing each LIPFilter probing entry. + */ + struct ProbeEntry { + ProbeEntry(const LIPFilter *lip_filter_in, + const attribute_id attr_id_in, + const Type *attr_type_in) + : lip_filter(lip_filter_in), + attr_id(attr_id_in), + attr_type(attr_type_in), + miss(0), + cnt(0) { + } + + /** + * @brief Whether a LIPFilter is more selective than the other. + */ + static bool isBetterThan(const ProbeEntry *a, + const ProbeEntry *b) { + return a->miss_rate > b->miss_rate; + } + + const LIPFilter *lip_filter; + const attribute_id attr_id; + const Type *attr_type; + std::uint32_t miss; + std::uint32_t cnt; + float miss_rate; + }; + + /** + * @brief Sepecialized filterValueAccessor implementation where the given + * ValueAccessor has no existence map. + */ + inline TupleIdSequence* filterValueAccessorNoExistenceMap(ValueAccessor *accessor) { + const std::uint32_t num_tuples = accessor->getNumTuplesVirtual(); + std::unique_ptr<TupleIdSequence> matches(new TupleIdSequence(num_tuples)); + std::uint32_t next_batch_size = 64u; + std::vector<tuple_id> batch(num_tuples); + + // Apply the filters in a batched manner. + std::uint32_t batch_start = 0; + do { + const std::uint32_t batch_size = + std::min(next_batch_size, num_tuples - batch_start); + for (std::uint32_t i = 0; i < batch_size; ++i) { + batch[i] = batch_start + i; + } + + const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size); + for (std::uint32_t i = 0; i < num_hits; ++i) { + matches->set(batch[i], true); + } + + batch_start += batch_size; + next_batch_size *= 2; + } while (batch_start < num_tuples); + + return matches.release(); + } + + /** + * @brief Sepecialized filterValueAccessor implementation where the given + * ValueAccessor has an existence map. + */ + inline TupleIdSequence* filterValueAccessorWithExistenceMap(ValueAccessor *accessor, + const TupleIdSequence *existence_map) { + std::unique_ptr<TupleIdSequence> matches( + new TupleIdSequence(existence_map->length())); + std::uint32_t next_batch_size = 64u; + std::uint32_t num_tuples_left = existence_map->numTuples(); + std::vector<tuple_id> batch(num_tuples_left); + + // Apply the filters in a batched manner. + TupleIdSequence::const_iterator tuple_it = existence_map->before_begin(); + do { + const std::uint32_t batch_size = + next_batch_size < num_tuples_left ? next_batch_size : num_tuples_left; + for (std::uint32_t i = 0; i < batch_size; ++i) { + ++tuple_it; + batch[i] = *tuple_it; + } + + const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size); + for (std::uint32_t i = 0; i < num_hits; ++i) { + matches->set(batch[i], true); + } + + num_tuples_left -= batch_size; + next_batch_size *= 2; + } while (num_tuples_left > 0); + + return matches.release(); + } + + /** + * @brief Filter the given batch of tuples from the ValueAccessor. Remove any + * tuple in the batch that misses any filter. + */ + inline std::size_t filterBatch(ValueAccessor *accessor, + std::vector<tuple_id> *batch, + std::uint32_t batch_size) { + // Apply the LIPFilters one by one to the batch and update corresponding + // cnt/miss statistics. + for (auto *entry : probe_entries_) { + const std::uint32_t out_size = + entry->lip_filter->filterBatch(accessor, + entry->attr_id, + entry->attr_type->isNullable(), + batch, + batch_size); + entry->cnt += batch_size; + entry->miss += batch_size - out_size; + batch_size = out_size; + } + + // Adaptively adjust the application ordering after each batch. + adaptEntryOrder(); + + return batch_size; + } + + /** + * @brief Adjust LIPFilter application ordering with regard to their miss + * rates (i.e. selectivites). + */ + inline void adaptEntryOrder() { + for (auto &entry : probe_entries_) { + entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt; + } + std::sort(probe_entries_.begin(), + probe_entries_.end(), + ProbeEntry::isBetterThan); + } + + std::vector<ProbeEntry *> probe_entries_; + + DISALLOW_COPY_AND_ASSIGN(LIPFilterAdaptiveProber); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/utility/lip_filter/LIPFilterBuilder.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterBuilder.hpp b/utility/lip_filter/LIPFilterBuilder.hpp new file mode 100644 index 0000000..deb8f66 --- /dev/null +++ b/utility/lip_filter/LIPFilterBuilder.hpp @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_ +#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_ + +#include <cstddef> +#include <memory> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilter.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +class Type; +class ValueAccessor; + +/** \addtogroup Utility + * @{ + */ + +class LIPFilterBuilder; +typedef std::shared_ptr<LIPFilterBuilder> LIPFilterBuilderPtr; + +/** + * @brief Helper class for building LIPFilters from a relation (i.e. ValueAccessor). + */ +class LIPFilterBuilder { + public: + /** + * @brief Constructor. + * + * @param lip_filters The LIPFilters that will be built. + * @param attr_ids The target attribute ids for the LIPFilters. + * @param attr_types The target attribute types for the LIPFilters. + */ + LIPFilterBuilder(const std::vector<LIPFilter *> &lip_filters, + const std::vector<attribute_id> &attr_ids, + const std::vector<const Type *> &attr_types) { + DCHECK_EQ(lip_filters.size(), attr_ids.size()); + DCHECK_EQ(lip_filters.size(), attr_types.size()); + + build_entries_.reserve(lip_filters.size()); + for (std::size_t i = 0; i < lip_filters.size(); ++i) { + build_entries_.emplace_back(lip_filters[i], attr_ids[i], attr_types[i]); + } + } + + /** + * @brief Insert all the values from the given ValueAccessor into the attached + * LIPFilters with regard to each target attribute id in \p attr_ids_. + * + * @param accessor The ValueAccessor which will be used to access the values. + */ + void insertValueAccessor(ValueAccessor *accessor) { + for (auto &entry : build_entries_) { + entry.lip_filter->insertValueAccessor(accessor, + entry.attr_id, + entry.attr_type); + } + } + + private: + /** + * @brief Internal data structure for representing each LIPFilter building entry. + */ + struct BuildEntry { + BuildEntry(LIPFilter *lip_filter_in, + const attribute_id attr_id_in, + const Type *attr_type_in) + : lip_filter(lip_filter_in), + attr_id(attr_id_in), + attr_type(attr_type_in) { + } + LIPFilter *lip_filter; + const attribute_id attr_id; + const Type *attr_type; + }; + + std::vector<BuildEntry> build_entries_; + + DISALLOW_COPY_AND_ASSIGN(LIPFilterBuilder); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/utility/lip_filter/LIPFilterDeployment.cpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp new file mode 100644 index 0000000..cd4d90f --- /dev/null +++ b/utility/lip_filter/LIPFilterDeployment.cpp @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "utility/lip_filter/LIPFilterDeployment.hpp" + +#include <memory> +#include <vector> + +#include "types/TypeFactory.hpp" +#include "utility/lip_filter/LIPFilter.hpp" +#include "utility/lip_filter/LIPFilter.pb.h" +#include "utility/lip_filter/LIPFilterBuilder.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +LIPFilterDeployment::LIPFilterDeployment( + const serialization::LIPFilterDeployment &proto, + const std::vector<std::unique_ptr<LIPFilter>> &lip_filters) { + switch (proto.action_type()) { + case serialization::LIPFilterActionType::BUILD: + action_type_ = LIPFilterActionType::kBuild; + break; + case serialization::LIPFilterActionType::PROBE: + action_type_ = LIPFilterActionType::kProbe; + break; + default: + LOG(FATAL) << "Unsupported LIPFilterActionType: " + << serialization::LIPFilterActionType_Name(proto.action_type()); + } + + for (int i = 0; i < proto.entries_size(); ++i) { + const auto &entry_proto = proto.entries(i); + lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get()); + attr_ids_.emplace_back(entry_proto.attribute_id()); + attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type())); + } +} + +bool LIPFilterDeployment::ProtoIsValid( + const serialization::LIPFilterDeployment &proto) { + if (proto.action_type() != serialization::LIPFilterActionType::BUILD && + proto.action_type() != serialization::LIPFilterActionType::PROBE) { + LOG(FATAL) << "Unsupported LIPFilterActionType: " + << serialization::LIPFilterActionType_Name(proto.action_type()); + } + if (proto.entries_size() == 0) { + return false; + } + for (int i = 0; i < proto.entries_size(); ++i) { + const auto &entry_proto = proto.entries(i); + if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) { + return false; + } + } + return true; +} + +LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const { + DCHECK(action_type_ == LIPFilterActionType::kBuild); + return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_); +} + +LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const { + DCHECK(action_type_ == LIPFilterActionType::kProbe); + return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/utility/lip_filter/LIPFilterDeployment.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp new file mode 100644 index 0000000..9b37f88 --- /dev/null +++ b/utility/lip_filter/LIPFilterDeployment.hpp @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_ +#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_ + +#include <memory> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "utility/Macros.hpp" + +namespace quickstep { + +namespace serialization { class LIPFilterDeployment; } + +class LIPFilter; +class LIPFilterBuilder; +class LIPFilterAdaptiveProber; +class Type; + +/** \addtogroup Utility + * @{ + */ + +enum class LIPFilterActionType { + kBuild = 0, + kProbe +}; + +/** + * @brief Helper class for organizing a group of LIPFilters in the backend. + * Each LIPFilterDeployment object is attached to a RelationalOperator. + */ +class LIPFilterDeployment { + public: + /** + * @brief Constructor. + * + * @param proto The Protocol Buffer representation of a LIPFilterDeployment. + * @param lip_filters The LIPFilter objects to be deployed. + */ + LIPFilterDeployment(const serialization::LIPFilterDeployment &proto, + const std::vector<std::unique_ptr<LIPFilter>> &lip_filters); + /** + * @brief Determine if a serialized protobuf representation of a + * LIPFilterDeployment is fully-formed and valid. + * + * @param proto A serialized protobuf representation of a LIPFilterDeployment + * to check for validity. + * @return Whether proto is fully-formed and valid. + **/ + static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto); + + /** + * @brief Get the action type for this group of LIPFilters (i.e. whether + * to build or probe the filters). + * + * @return The action type. + */ + LIPFilterActionType getActionType() const { + return action_type_; + } + + /** + * @brief Create a LIPFilterBuilder for this group of LIPFilters. + * + * @return A new LIPFilterBuilder object for this group of LIPFilters. + * Caller should take ownership of the returned object. + */ + LIPFilterBuilder* createLIPFilterBuilder() const; + + /** + * @brief Create a LIPFilterAdaptiveProber for this group of LIPFilters. + * + * @return A new LIPFilterAdaptiveProber object for this group of LIPFilters. + * Caller should take ownership of the returned object. + */ + LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const; + + private: + LIPFilterActionType action_type_; + + std::vector<LIPFilter *> lip_filters_; + std::vector<attribute_id> attr_ids_; + std::vector<const Type *> attr_types_; + + DISALLOW_COPY_AND_ASSIGN(LIPFilterDeployment); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/utility/lip_filter/LIPFilterFactory.cpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp new file mode 100644 index 0000000..ebc4a0e --- /dev/null +++ b/utility/lip_filter/LIPFilterFactory.cpp @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "utility/lip_filter/LIPFilterFactory.hpp" + +#include <cstddef> +#include <cstdint> + +#include "utility/lip_filter/LIPFilter.pb.h" +#include "utility/lip_filter/SingleIdentityHashFilter.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter &proto) { + switch (proto.lip_filter_type()) { + case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: { + const std::size_t attr_size = + proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size); + const std::size_t filter_cardinality = + proto.GetExtension(serialization::SingleIdentityHashFilter::filter_cardinality); + + if (attr_size >= 8) { + return new SingleIdentityHashFilter<std::uint64_t>(filter_cardinality); + } else if (attr_size >= 4) { + return new SingleIdentityHashFilter<std::uint32_t>(filter_cardinality); + } else if (attr_size >= 2) { + return new SingleIdentityHashFilter<std::uint16_t>(filter_cardinality); + } else { + return new SingleIdentityHashFilter<std::uint8_t>(filter_cardinality); + } + } + // TODO(jianqiao): handle the BLOOM_FILTER and EXACT_FILTER implementations. + default: + LOG(FATAL) << "Unsupported LIP filter type: " + << serialization::LIPFilterType_Name(proto.lip_filter_type()); + } + return nullptr; +} + +bool LIPFilterFactory::ProtoIsValid(const serialization::LIPFilter &proto) { + switch (proto.lip_filter_type()) { + case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: { + const std::size_t attr_size = + proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size); + const std::size_t filter_cardinality = + proto.GetExtension(serialization::SingleIdentityHashFilter::filter_cardinality); + return (attr_size != 0 && filter_cardinality != 0); + } + default: + LOG(FATAL) << "Unsupported LIP filter type: " + << serialization::LIPFilterType_Name(proto.lip_filter_type()); + } + return false; +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/utility/lip_filter/LIPFilterFactory.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterFactory.hpp b/utility/lip_filter/LIPFilterFactory.hpp new file mode 100644 index 0000000..b8301b8 --- /dev/null +++ b/utility/lip_filter/LIPFilterFactory.hpp @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_ +#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_ + +#include "utility/Macros.hpp" + +namespace quickstep { + +namespace serialization { class LIPFilter; } + +class LIPFilter; + +/** \addtogroup Utility + * @{ + */ + +/** + * @brief All-static factory object that provides access to various implementations + * of LIPFilters. + */ +class LIPFilterFactory { + public: + /** + * @brief Reconstruct a LIPFilter from its serialized Protocol Buffer form. + * + * @param proto The Protocol Buffer representation of a LIPFilter object, + * @return A new LIPFilter reconstructed from the supplied Protocol Buffer. + * Caller should take ownership of the returned object. + */ + static LIPFilter* ReconstructFromProto(const serialization::LIPFilter &proto); + + /** + * @brief Check whether a serialization::LIPFilter is fully-formed and + * all parts are valid. + * + * @param proto A serialized Protocol Buffer representation of a LIPFilter. + * @return Whether proto is fully-formed and valid. + **/ + static bool ProtoIsValid(const serialization::LIPFilter &proto); + + private: + LIPFilterFactory() {} + + DISALLOW_COPY_AND_ASSIGN(LIPFilterFactory); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b57d551c/utility/lip_filter/SingleIdentityHashFilter.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp new file mode 100644 index 0000000..0f213d6 --- /dev/null +++ b/utility/lip_filter/SingleIdentityHashFilter.hpp @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_ +#define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_ + +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <cstring> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageConstants.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorUtil.hpp" +#include "types/Type.hpp" +#include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilter.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** \addtogroup Utility + * @{ + */ + +/** + * @brief Specialized bloom filter that uses only one hash function. The hash + * function is an "identity" function that it simply reinterpret_casts + * a value's byte stream into the specified CppType as the value's hash + * code. + * + * @note SingleIdentityHashFilter is most effective when applied to fixed-length + * integer values. It cannot be applied to variable-length values unless + * the corresponding value Type has its minimumByteLength() greater than + * sizeof(CppType). + */ +template <typename CppType> +class SingleIdentityHashFilter : public LIPFilter { + public: + /** + * @brief Constructor. + * + * @param filter_cardinality The cardinality of this hash filter. + */ + explicit SingleIdentityHashFilter(const std::size_t filter_cardinality) + : LIPFilter(LIPFilterType::kSingleIdentityHashFilter), + filter_cardinality_(filter_cardinality), + bit_array_(GetByteSize(filter_cardinality)) { + DCHECK_GE(filter_cardinality, 0u); + std::memset(bit_array_.data(), + 0x0, + sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality)); + } + + void insertValueAccessor(ValueAccessor *accessor, + const attribute_id attr_id, + const Type *attr_type) override { + InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + if (attr_type->isNullable()) { + this->insertValueAccessorInternal<true>(accessor, attr_id); + } else { + this->insertValueAccessorInternal<false>(accessor, attr_id); + } + }); + } + + std::size_t filterBatch(ValueAccessor *accessor, + const attribute_id attr_id, + const bool is_attr_nullable, + std::vector<tuple_id> *batch, + const std::size_t batch_size) const override { + DCHECK(batch != nullptr); + DCHECK_LE(batch_size, batch->size()); + + return InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> std::size_t { // NOLINT(build/c++11) + if (is_attr_nullable) { + return this->filterBatchInternal<true>(accessor, attr_id, batch, batch_size); + } else { + return this->filterBatchInternal<false>(accessor, attr_id, batch, batch_size); + } + }); + } + + private: + /** + * @brief Round up bit_size to multiples of 8. + */ + inline static std::size_t GetByteSize(const std::size_t bit_size) { + return (bit_size + 7) / 8; + } + + /** + * @brief Iterate through the accessor and hash values into the internal bit + * array. + */ + template <bool is_attr_nullable, typename ValueAccessorT> + inline void insertValueAccessorInternal(ValueAccessorT *accessor, + const attribute_id attr_id) { + accessor->beginIteration(); + while (accessor->next()) { + const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id); + if (!is_attr_nullable || value != nullptr) { + insert(value); + } + } + } + + /** + * @brief Iterate through the accessor and hash values into the internal bit + * array. + */ + template <bool is_attr_nullable, typename ValueAccessorT> + inline std::size_t filterBatchInternal(const ValueAccessorT *accessor, + const attribute_id attr_id, + std::vector<tuple_id> *batch, + const std::size_t batch_size) const { + std::size_t out_size = 0; + for (std::size_t i = 0; i < batch_size; ++i) { + const tuple_id tid = batch->at(i); + const void *value = + accessor->template getUntypedValueAtAbsolutePosition(attr_id, tid); + if (is_attr_nullable && value == nullptr) { + continue; + } + if (contains(value)) { + batch->at(out_size) = tid; + ++out_size; + } + } + return out_size; + } + + /** + * @brief Inserts a given value into the hash filter. + */ + inline void insert(const void *key_begin) { + const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_; + bit_array_[hash >> 3u].fetch_or(1u << (hash & 7u), std::memory_order_relaxed); + } + + /** + * @brief Test membership of a given value in the hash filter. + * If true is returned, then a value may or may not be present in the hash filter. + * If false is returned, a value is certainly not present in the hash filter. + */ + inline bool contains(const void *key_begin) const { + const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_; + return (bit_array_[hash >> 3u].load(std::memory_order_relaxed) & (1u << (hash & 7u))); + } + + std::size_t filter_cardinality_; + alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_; + + DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_