Optimizer changes for the LIPFilter feature.

Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7a464434
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7a464434
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7a464434

Branch: refs/heads/lip-refactor
Commit: 7a46443491b1c25af3d7aaf738d6e9b096ed52d0
Parents: 160276c
Author: Jianqiao Zhu <jianq...@cs.wisc.edu>
Authored: Wed Sep 7 13:20:43 2016 -0500
Committer: Harshad Deshmukh <hbdeshm...@apache.org>
Committed: Tue Oct 18 11:26:02 2016 -0500

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |  12 +-
 query_optimizer/ExecutionGenerator.cpp          |  49 ---
 query_optimizer/ExecutionGenerator.hpp          |   5 +-
 query_optimizer/ExecutionHeuristics.cpp         | 129 --------
 query_optimizer/ExecutionHeuristics.hpp         | 157 ----------
 query_optimizer/PhysicalGenerator.cpp           |  18 +-
 .../cost_model/StarSchemaSimpleCostModel.cpp    |   2 +-
 query_optimizer/physical/CMakeLists.txt         |   7 +
 .../physical/LIPFilterConfiguration.hpp         | 171 ++++++++++
 query_optimizer/physical/TopLevelPlan.hpp       |  43 ++-
 query_optimizer/rules/AttachLIPFilters.cpp      | 248 +++++++++++++++
 query_optimizer/rules/AttachLIPFilters.hpp      | 151 +++++++++
 query_optimizer/rules/CMakeLists.txt            |  19 ++
 .../StarSchemaHashJoinOrderOptimization.cpp     | 273 ++++++++++------
 .../StarSchemaHashJoinOrderOptimization.hpp     | 118 +++++--
 query_optimizer/tests/CMakeLists.txt            |  16 -
 .../tests/ExecutionHeuristics_unittest.cpp      | 311 -------------------
 utility/CMakeLists.txt                          |  15 +
 utility/DisjointTreeForest.hpp                  | 152 +++++++++
 utility/PlanVisualizer.cpp                      |  51 ++-
 utility/PlanVisualizer.hpp                      |   3 +
 utility/lip_filter/CMakeLists.txt               |  19 ++
 utility/lip_filter/LIPFilter.hpp                |  39 +++
 utility/tests/DisjointTreeForest_unittest.cpp   |  98 ++++++
 24 files changed, 1277 insertions(+), 829 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 988ffd8..fa9141c 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -41,7 +41,6 @@ add_subdirectory(tests)
 
 # Declare micro-libs:
 add_library(quickstep_queryoptimizer_ExecutionGenerator ExecutionGenerator.cpp 
ExecutionGenerator.hpp)
-add_library(quickstep_queryoptimizer_ExecutionHeuristics 
ExecutionHeuristics.cpp ExecutionHeuristics.hpp)
 add_library(quickstep_queryoptimizer_LogicalGenerator LogicalGenerator.cpp 
LogicalGenerator.hpp)
 add_library(quickstep_queryoptimizer_LogicalToPhysicalMapper
             ../empty_src.cpp
@@ -73,7 +72,6 @@ 
target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       
quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
-                      quickstep_queryoptimizer_ExecutionHeuristics
                       quickstep_queryoptimizer_OptimizerContext
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_queryoptimizer_QueryPlan
@@ -153,14 +151,6 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                         quickstep_catalog_Catalog_proto)
 endif()
-target_link_libraries(quickstep_queryoptimizer_ExecutionHeuristics
-                      glog
-                      quickstep_catalog_CatalogRelation
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryContext_proto
-                      quickstep_queryoptimizer_QueryPlan
-                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_LogicalGenerator
                       glog
                       quickstep_parser_ParseStatement
@@ -196,6 +186,7 @@ 
target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_LogicalToPhysicalMapper
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_rules_PruneColumns
                       
quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
                       quickstep_queryoptimizer_rules_SwapProbeBuild
@@ -233,7 +224,6 @@ target_link_libraries(quickstep_queryoptimizer_Validator
 add_library(quickstep_queryoptimizer ../empty_src.cpp QueryOptimizerModule.hpp)
 target_link_libraries(quickstep_queryoptimizer
                       quickstep_queryoptimizer_ExecutionGenerator
-                      quickstep_queryoptimizer_ExecutionHeuristics
                       quickstep_queryoptimizer_LogicalGenerator
                       quickstep_queryoptimizer_LogicalToPhysicalMapper
                       quickstep_queryoptimizer_Optimizer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp 
b/query_optimizer/ExecutionGenerator.cpp
index 9347c9c..09ef9e0 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -54,7 +54,6 @@
 #include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
-#include "query_optimizer/ExecutionHeuristics.hpp"
 #include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
@@ -211,11 +210,6 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr 
&physical_plan) {
         temporary_relation_info.producer_operator_index);
   }
 
-  // Optimize execution plan based on heuristics captured during execution 
plan generation, if enabled.
-  if (FLAGS_optimize_joins) {
-    execution_heuristics_->optimizeExecutionPlan(execution_plan_, 
query_context_proto_);
-  }
-
 #ifdef QUICKSTEP_DISTRIBUTED
   catalog_database_cache_proto_->set_name(catalog_database_->getName());
 
@@ -600,34 +594,14 @@ void ExecutionGenerator::convertHashJoin(const 
P::HashJoinPtr &physical_plan) {
   std::vector<attribute_id> probe_attribute_ids;
   std::vector<attribute_id> build_attribute_ids;
 
-  std::vector<attribute_id> probe_original_attribute_ids;
-  std::vector<attribute_id> build_original_attribute_ids;
-
-  const CatalogRelation *referenced_stored_probe_relation = nullptr;
-  const CatalogRelation *referenced_stored_build_relation = nullptr;
-
   std::size_t build_cardinality = 
cost_model_->estimateCardinality(build_physical);
 
   bool any_probe_attributes_nullable = false;
   bool any_build_attributes_nullable = false;
 
-  bool skip_hash_join_optimization = false;
-
   const std::vector<E::AttributeReferencePtr> &left_join_attributes =
       physical_plan->left_join_attributes();
   for (const E::AttributeReferencePtr &left_join_attribute : 
left_join_attributes) {
-    // Try to determine the original stored relation referenced in the Hash 
Join.
-    referenced_stored_probe_relation =
-        
catalog_database_->getRelationByName(left_join_attribute->relation_name());
-    if (referenced_stored_probe_relation == nullptr) {
-      // Hash Join optimizations are not possible, if the referenced relation 
cannot be determined.
-      skip_hash_join_optimization = true;
-    } else {
-      const attribute_id probe_operator_attribute_id =
-          
referenced_stored_probe_relation->getAttributeByName(left_join_attribute->attribute_name())->getID();
-      probe_original_attribute_ids.emplace_back(probe_operator_attribute_id);
-    }
-
     const CatalogAttribute *probe_catalog_attribute
         = attribute_substitution_map_[left_join_attribute->id()];
     probe_attribute_ids.emplace_back(probe_catalog_attribute->getID());
@@ -640,18 +614,6 @@ void ExecutionGenerator::convertHashJoin(const 
P::HashJoinPtr &physical_plan) {
   const std::vector<E::AttributeReferencePtr> &right_join_attributes =
       physical_plan->right_join_attributes();
   for (const E::AttributeReferencePtr &right_join_attribute : 
right_join_attributes) {
-    // Try to determine the original stored relation referenced in the Hash 
Join.
-    referenced_stored_build_relation =
-        
catalog_database_->getRelationByName(right_join_attribute->relation_name());
-    if (referenced_stored_build_relation == nullptr) {
-      // Hash Join optimizations are not possible, if the referenced relation 
cannot be determined.
-      skip_hash_join_optimization = true;
-    } else {
-      const attribute_id build_operator_attribute_id =
-          
referenced_stored_build_relation->getAttributeByName(right_join_attribute->attribute_name())->getID();
-      build_original_attribute_ids.emplace_back(build_operator_attribute_id);
-    }
-
     const CatalogAttribute *build_catalog_attribute
         = attribute_substitution_map_[right_join_attribute->id()];
     build_attribute_ids.emplace_back(build_catalog_attribute->getID());
@@ -828,17 +790,6 @@ void ExecutionGenerator::convertHashJoin(const 
P::HashJoinPtr &physical_plan) {
       std::forward_as_tuple(join_operator_index,
                             output_relation));
   temporary_relation_info_vec_.emplace_back(join_operator_index, 
output_relation);
-
-  // Add heuristics for the Hash Join, if enabled.
-  if (FLAGS_optimize_joins && !skip_hash_join_optimization) {
-    execution_heuristics_->addHashJoinInfo(build_operator_index,
-                                           join_operator_index,
-                                           referenced_stored_build_relation,
-                                           referenced_stored_probe_relation,
-                                           
std::move(build_original_attribute_ids),
-                                           
std::move(probe_original_attribute_ids),
-                                           join_hash_table_index);
-  }
 }
 
 void ExecutionGenerator::convertNestedLoopsJoin(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp 
b/query_optimizer/ExecutionGenerator.hpp
index 2aaf5ab..495955e 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -33,7 +33,6 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
-#include "query_optimizer/ExecutionHeuristics.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
@@ -102,8 +101,7 @@ class ExecutionGenerator {
       : catalog_database_(DCHECK_NOTNULL(catalog_database)),
         query_handle_(DCHECK_NOTNULL(query_handle)),
         execution_plan_(DCHECK_NOTNULL(query_handle->getQueryPlanMutable())),
-        
query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())),
-        execution_heuristics_(new ExecutionHeuristics()) {
+        
query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable()))
 {
     query_context_proto_->set_query_id(query_handle_->query_id());
 #ifdef QUICKSTEP_DISTRIBUTED
     catalog_database_cache_proto_ = 
DCHECK_NOTNULL(query_handle->getCatalogDatabaseCacheProtoMutable());
@@ -386,7 +384,6 @@ class ExecutionGenerator {
   QueryHandle *query_handle_;
   QueryPlan *execution_plan_;  // A part of QueryHandle.
   serialization::QueryContext *query_context_proto_;  // A part of QueryHandle.
-  std::unique_ptr<ExecutionHeuristics> execution_heuristics_;
 
 #ifdef QUICKSTEP_DISTRIBUTED
   serialization::CatalogDatabase *catalog_database_cache_proto_;  // A part of 
QueryHandle.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/ExecutionHeuristics.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.cpp 
b/query_optimizer/ExecutionHeuristics.cpp
deleted file mode 100644
index 4fd7320..0000000
--- a/query_optimizer/ExecutionHeuristics.cpp
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_optimizer/ExecutionHeuristics.hpp"
-
-#include <cstddef>
-#include <utility>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_optimizer/QueryPlan.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-namespace optimizer {
-
-void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
-                                                serialization::QueryContext 
*query_context_proto) {
-  // Currently this only optimizes left deep joins using bloom filters.
-  // It uses a simple algorithm to discover the left deep joins.
-  // It starts with the first hash join in the plan and keeps on iterating
-  // over the next hash joins, till a probe on a different relation id is 
found.
-  // The set of hash joins found in this way forms a chain and can be 
recognized
-  // as a left deep join. It becomes a candidate for optimization.
-
-  // The optimization is done by modifying each of the build operators in the 
chain
-  // to generate a bloom filter on the build key during their hash table 
creation.
-  // The leaf-level probe operator is then modified to query all the bloom
-  // filters generated from all the build operators in the chain. These
-  // bloom filters are queried to test the membership of the probe key
-  // just prior to probing the hash table.
-
-  QueryPlan::DAGNodeIndex origin_node = 0;
-  while (origin_node < hash_joins_.size() - 1) {
-    std::vector<std::size_t> chained_nodes;
-    chained_nodes.push_back(origin_node);
-    for (std::size_t i = origin_node + 1; i < hash_joins_.size(); ++i) {
-      const relation_id checked_relation_id = 
hash_joins_[origin_node].referenced_stored_probe_relation_->getID();
-      const relation_id expected_relation_id = 
hash_joins_[i].referenced_stored_probe_relation_->getID();
-      if (checked_relation_id == expected_relation_id) {
-        chained_nodes.push_back(i);
-      } else {
-        break;
-      }
-    }
-
-    // Only chains of length greater than one are suitable candidates for 
semi-join optimization.
-    if (chained_nodes.size() > 1) {
-      std::unordered_map<QueryContext::bloom_filter_id, 
std::vector<attribute_id>> probe_bloom_filter_info;
-      for (const std::size_t node : chained_nodes) {
-        // Provision for a new bloom filter to be used by the build operator.
-        const QueryContext::bloom_filter_id bloom_filter_id =  
query_context_proto->bloom_filters_size();
-        serialization::BloomFilter *bloom_filter_proto = 
query_context_proto->add_bloom_filters();
-
-        // Modify the bloom filter properties based on the statistics of the 
relation.
-        setBloomFilterProperties(bloom_filter_proto, 
hash_joins_[node].referenced_stored_build_relation_);
-
-        // Add build-side bloom filter information to the corresponding hash 
table proto.
-        
query_context_proto->mutable_join_hash_tables(hash_joins_[node].join_hash_table_id_)
-            ->add_build_side_bloom_filter_id(bloom_filter_id);
-
-        probe_bloom_filter_info.insert(std::make_pair(bloom_filter_id, 
hash_joins_[node].probe_attributes_));
-      }
-
-      // Add probe-side bloom filter information to the corresponding hash 
table proto for each build-side bloom filter.
-      for (const std::pair<QueryContext::bloom_filter_id, 
std::vector<attribute_id>>
-               &bloom_filter_info : probe_bloom_filter_info) {
-        auto *probe_side_bloom_filter =
-            
query_context_proto->mutable_join_hash_tables(hash_joins_[origin_node].join_hash_table_id_)
-                                  ->add_probe_side_bloom_filters();
-        
probe_side_bloom_filter->set_probe_side_bloom_filter_id(bloom_filter_info.first);
-        for (const attribute_id &probe_attribute_id : 
bloom_filter_info.second) {
-          probe_side_bloom_filter->add_probe_side_attr_ids(probe_attribute_id);
-        }
-      }
-
-      // Add node dependencies from chained build nodes to origin node probe.
-      for (std::size_t i = 1; i < chained_nodes.size(); ++i) {  // Note: It 
starts from index 1.
-        
query_plan->addDirectDependency(hash_joins_[origin_node].join_operator_index_,
-                                        hash_joins_[origin_node + 
i].build_operator_index_,
-                                        true /* is_pipeline_breaker */);
-      }
-    }
-
-    // Update the origin node.
-    origin_node = chained_nodes.back() + 1;
-  }
-}
-
-void ExecutionHeuristics::setBloomFilterProperties(serialization::BloomFilter 
*bloom_filter_proto,
-                                                   const CatalogRelation 
*relation) {
-  const std::size_t cardinality = relation->estimateTupleCardinality();
-  if (cardinality < kOneThousand) {
-    bloom_filter_proto->set_bloom_filter_size(kOneThousand / 
kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kVeryLowSparsityHash);
-  } else if (cardinality < kTenThousand) {
-    bloom_filter_proto->set_bloom_filter_size(kTenThousand / 
kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kLowSparsityHash);
-  } else if (cardinality < kHundredThousand) {
-    bloom_filter_proto->set_bloom_filter_size(kHundredThousand / 
kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kMediumSparsityHash);
-  } else {
-    bloom_filter_proto->set_bloom_filter_size(kMillion / kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kHighSparsityHash);
-  }
-}
-
-}  // namespace optimizer
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/ExecutionHeuristics.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.hpp 
b/query_optimizer/ExecutionHeuristics.hpp
deleted file mode 100644
index 8ad3b7a..0000000
--- a/query_optimizer/ExecutionHeuristics.hpp
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_HEURISTICS_HPP_
-#define QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_HEURISTICS_HPP_
-
-#include <vector>
-
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_optimizer/QueryPlan.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-namespace optimizer {
-
-/** \addtogroup QueryOptimizer
- *  @{
- */
-
-/**
- * @brief The ExecutionHeuristics compiles certain heuristics for an execution 
plan
- *        as it is being converted to a physical plan. These heuristics can 
then be
- *        used to optimize the execution plan after it has been generated.
- **/
-class ExecutionHeuristics {
- public:
-  static const std::size_t kOneHundred = 100;
-  static const std::size_t kOneThousand = 1000;
-  static const std::size_t kTenThousand = 10000;
-  static const std::size_t kHundredThousand = 100000;
-  static const std::size_t kMillion = 1000000;
-
-  static const std::size_t kCompressionFactor = 10;
-
-  static const std::size_t kVeryLowSparsityHash = 1;
-  static const std::size_t kLowSparsityHash = 2;
-  static const std::size_t kMediumSparsityHash = 5;
-  static const std::size_t kHighSparsityHash = 10;
-
-  /**
-   * @brief A simple internal class that holds information about various
-   *        hash joins within the execution plan for a query.
-   **/
-  struct HashJoinInfo {
-    HashJoinInfo(const QueryPlan::DAGNodeIndex build_operator_index,
-                 const QueryPlan::DAGNodeIndex join_operator_index,
-                 const CatalogRelation *referenced_stored_build_relation,
-                 const CatalogRelation *referenced_stored_probe_relation,
-                 std::vector<attribute_id> &&build_attributes,
-                 std::vector<attribute_id> &&probe_attributes,
-                 const QueryContext::join_hash_table_id join_hash_table_id)
-        : build_operator_index_(build_operator_index),
-          join_operator_index_(join_operator_index),
-          referenced_stored_build_relation_(referenced_stored_build_relation),
-          referenced_stored_probe_relation_(referenced_stored_probe_relation),
-          build_attributes_(std::move(build_attributes)),
-          probe_attributes_(std::move(probe_attributes)),
-          join_hash_table_id_(join_hash_table_id) {
-    }
-
-    const QueryPlan::DAGNodeIndex build_operator_index_;
-    const QueryPlan::DAGNodeIndex join_operator_index_;
-    const CatalogRelation *referenced_stored_build_relation_;
-    const CatalogRelation *referenced_stored_probe_relation_;
-    const std::vector<attribute_id> build_attributes_;
-    const std::vector<attribute_id> probe_attributes_;
-    const QueryContext::join_hash_table_id join_hash_table_id_;
-  };
-
-
-  /**
-   * @brief Constructor.
-   **/
-  ExecutionHeuristics() {}
-
-  /**
-   * @brief Saves information about a hash join used within the execution plan
-   *        for a query.
-   *
-   * @param build_operator_index Index of the build operator of the hash join.
-   * @param join_operator_index Index of the join operator of the hash join.
-   * @param build_relation_id Id of the relation on which hash table is being 
built.
-   * @param probe_relation_id Id of the relation on which hash table is being 
probed.
-   * @param build_attributes List of attributes on which hash table is being 
built.
-   * @param probe_attributes List of attributes on which hash table is being 
probed.
-   * @param join_hash_table_id Id of the hash table which refers to the actual 
hash
-   *        table within the query context.
-   **/
-  inline void addHashJoinInfo(const QueryPlan::DAGNodeIndex 
build_operator_index,
-                              const QueryPlan::DAGNodeIndex 
join_operator_index,
-                              const CatalogRelation 
*referenced_stored_build_relation,
-                              const CatalogRelation 
*referenced_stored_probe_relation,
-                              std::vector<attribute_id> &&build_attributes,
-                              std::vector<attribute_id> &&probe_attributes,
-                              const QueryContext::join_hash_table_id 
join_hash_table_id) {
-    hash_joins_.push_back(HashJoinInfo(build_operator_index,
-                                       join_operator_index,
-                                       referenced_stored_build_relation,
-                                       referenced_stored_probe_relation,
-                                       std::move(build_attributes),
-                                       std::move(probe_attributes),
-                                       join_hash_table_id));
-  }
-
-  /**
-   * @brief Optimize the execution plan based on heuristics generated
-   *        during physical plan to execution plan conversion.
-   *
-   * @param query_plan A mutable reference to the query execution plan.
-   * @param query_context_proto A mutable reference to the protobuf 
representation
-   *        of the query context.
-   **/
-  void optimizeExecutionPlan(QueryPlan *query_plan, 
serialization::QueryContext *query_context_proto);
-
-  /**
-   * @brief Set the properties of the bloom filter proto based on the 
statistics
-   *        of the given relation.
-   *
-   * @param bloom_filter_proto A mutable reference to the bloom filter 
protobuf representation.
-   * @param relation The catalog relation on which bloom filter is being built.
-   **/
-  void setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,
-                                const CatalogRelation *relation);
-
- private:
-  std::vector<HashJoinInfo> hash_joins_;
-
-  DISALLOW_COPY_AND_ASSIGN(ExecutionHeuristics);
-};
-
-/** @} */
-
-}  // namespace optimizer
-}  // namespace quickstep
-
-#endif /* QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_HEURISTICS_HPP_ */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp 
b/query_optimizer/PhysicalGenerator.cpp
index 8f19702..9db4037 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -26,6 +26,7 @@
 #include "query_optimizer/Validator.hpp"
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/AttachLIPFilters.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp"
 #include "query_optimizer/rules/SwapProbeBuild.hpp"
@@ -49,6 +50,12 @@ DEFINE_bool(reorder_hash_joins, true,
             "cardinality and selective tables to be joined first, which is 
suitable "
             "for queries on star-schema tables.");
 
+DEFINE_bool(use_lip_filters, false,
+            "If true, use LIP (Lookahead Information Passing) filters to 
accelerate "
+            "query processing. LIP filters are effective for queries on star 
schema "
+            "tables (e.g. the SSB benchmark) and snowflake schema tables (e.g. 
the "
+            "TPC-H benchmark).");
+
 DEFINE_bool(visualize_plan, false,
             "If true, visualize the final physical plan into a graph in DOT 
format "
             "(DOT is a plain text graph description language). Then print the "
@@ -95,11 +102,16 @@ P::PhysicalPtr PhysicalGenerator::generateInitialPlan(
 
 P::PhysicalPtr PhysicalGenerator::optimizePlan() {
   std::vector<std::unique_ptr<Rule<P::Physical>>> rules;
+  rules.emplace_back(new PruneColumns());
   if (FLAGS_reorder_hash_joins) {
     rules.emplace_back(new StarSchemaHashJoinOrderOptimization());
+    rules.emplace_back(new PruneColumns());
+  } else {
+    rules.emplace_back(new SwapProbeBuild());
+  }
+  if (FLAGS_use_lip_filters) {
+    rules.emplace_back(new AttachLIPFilters());
   }
-  rules.emplace_back(new PruneColumns());
-  rules.emplace_back(new SwapProbeBuild());
 
   for (std::unique_ptr<Rule<P::Physical>> &rule : rules) {
     physical_plan_ = rule->apply(physical_plan_);
@@ -110,7 +122,7 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
   DVLOG(4) << "Optimized physical plan:\n" << physical_plan_->toString();
 
   if (FLAGS_visualize_plan) {
-  quickstep::PlanVisualizer plan_visualizer;
+    quickstep::PlanVisualizer plan_visualizer;
     std::cerr << "\n" << plan_visualizer.visualize(physical_plan_) << "\n";
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp 
b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 8d254fa..1075739 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -358,7 +358,7 @@ double 
StarSchemaSimpleCostModel::estimateSelectivityForPredicate(
           std::static_pointer_cast<const E::LogicalAnd>(filter_predicate);
       double selectivity = 1.0;
       for (const auto &predicate : logical_and->operands()) {
-        selectivity = selectivity * estimateSelectivityForPredicate(predicate, 
physical_plan);
+        selectivity = std::min(selectivity, 
estimateSelectivityForPredicate(predicate, physical_plan));
       }
       return selectivity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt 
b/query_optimizer/physical/CMakeLists.txt
index 3b7d3f0..5c2cd0b 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -27,6 +27,7 @@ add_library(quickstep_queryoptimizer_physical_HashJoin 
HashJoin.cpp HashJoin.hpp
 add_library(quickstep_queryoptimizer_physical_InsertSelection 
InsertSelection.cpp InsertSelection.hpp)
 add_library(quickstep_queryoptimizer_physical_InsertTuple InsertTuple.cpp 
InsertTuple.hpp)
 add_library(quickstep_queryoptimizer_physical_Join ../../empty_src.cpp 
Join.hpp)
+add_library(quickstep_queryoptimizer_physical_LIPFilterConfiguration 
../../empty_src.cpp LIPFilterConfiguration.hpp)
 add_library(quickstep_queryoptimizer_physical_NestedLoopsJoin 
NestedLoopsJoin.cpp NestedLoopsJoin.hpp)
 add_library(quickstep_queryoptimizer_physical_PatternMatcher 
../../empty_src.cpp PatternMatcher.hpp)
 add_library(quickstep_queryoptimizer_physical_Physical ../../empty_src.cpp 
Physical.hpp)
@@ -150,6 +151,10 @@ 
target_link_libraries(quickstep_queryoptimizer_physical_Join
                       quickstep_queryoptimizer_expressions_NamedExpression
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_physical_LIPFilterConfiguration
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
 target_link_libraries(quickstep_queryoptimizer_physical_NestedLoopsJoin
                       glog
                       quickstep_queryoptimizer_OptimizerTree
@@ -237,6 +242,7 @@ 
target_link_libraries(quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExprId
                       quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Cast
@@ -279,6 +285,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
                       quickstep_queryoptimizer_physical_InsertSelection
                       quickstep_queryoptimizer_physical_InsertTuple
                       quickstep_queryoptimizer_physical_Join
+                      quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
                       quickstep_queryoptimizer_physical_PatternMatcher
                       quickstep_queryoptimizer_physical_Physical

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/physical/LIPFilterConfiguration.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/LIPFilterConfiguration.hpp 
b/query_optimizer/physical/LIPFilterConfiguration.hpp
new file mode 100644
index 0000000..62a6149
--- /dev/null
+++ b/query_optimizer/physical/LIPFilterConfiguration.hpp
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_LIP_FILTER_CONFIGURATION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_LIP_FILTER_CONFIGURATION_HPP_
+
+#include <cstddef>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerPhysical
+ *  @{
+ */
+
+class Physical;
+typedef std::shared_ptr<const Physical> PhysicalPtr;
+
+/**
+ * @brief Optimizer information for a LIP filter builder.
+ */
+struct LIPFilterBuildInfo {
+  /**
+   * @brief Constructor.
+   *
+   * @param build_attribute_in The attribute to build the LIP filter with.
+   * @param filter_cardinality_in The LIP filter's cardinality.
+   * @param filter_type_in The LIP filter's type.
+   */
+  LIPFilterBuildInfo(const expressions::AttributeReferencePtr 
&build_attribute_in,
+                     const std::size_t filter_cardinality_in,
+                     const LIPFilterType &filter_type_in)
+      : build_attribute(build_attribute_in),
+        filter_cardinality(filter_cardinality_in),
+        filter_type(filter_type_in) {
+  }
+  const expressions::AttributeReferencePtr build_attribute;
+  const std::size_t filter_cardinality;
+  const LIPFilterType filter_type;
+};
+
+/**
+ * @brief Optimizer information for a LIP filter prober.
+ */
+struct LIPFilterProbeInfo {
+  /**
+   * @brief Constructor.
+   *
+   * @param probe_attribute_in The attribute to probe the LIP filter with.
+   * @param build_attribute_in The attribute that the LIP filter is built with.
+   * @param builder_in The physical node that the LIP filter's builder is 
attached to.
+   */
+  LIPFilterProbeInfo(const expressions::AttributeReferencePtr 
&probe_attribute_in,
+                     const expressions::AttributeReferencePtr 
&build_attribute_in,
+                     const PhysicalPtr &builder_in)
+      : probe_attribute(probe_attribute_in),
+        build_attribute(build_attribute_in),
+        builder(builder_in) {
+  }
+  const expressions::AttributeReferencePtr probe_attribute;
+  const expressions::AttributeReferencePtr build_attribute;
+  const PhysicalPtr builder;
+};
+
+
+class LIPFilterConfiguration;
+typedef std::shared_ptr<const LIPFilterConfiguration> 
LIPFilterConfigurationPtr;
+
+/**
+ * @brief Configuration information of all the LIP filters in a query plan.
+ */
+class LIPFilterConfiguration {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  LIPFilterConfiguration() {
+  }
+
+  /**
+   * @brief Add information for a LIP filter builder.
+   *
+   * @param build_attribute The attribute to build the LIP filter with.
+   * @param builder The physical node to attach the LIP filter builder to.
+   * @param filter_size The LIP filter's cardinality.
+   * @param filter_type The LIP filter's type.
+   */
+  void addBuildInfo(const expressions::AttributeReferencePtr &build_attribute,
+                    const PhysicalPtr &builder,
+                    const std::size_t filter_size,
+                    const LIPFilterType &filter_type) {
+    build_info_map_[builder].emplace_back(
+        build_attribute, filter_size, filter_type);
+  }
+
+  /**
+   * @brief Add information for a LIP filter prober.
+   *
+   * @param probe_attribute The attribute to probe the LIP filter with.
+   * @param prober The physical node to attach the LIP filter prober to.
+   * @param build_attribute The attribute that the LIP filter is built with.
+   * @param builder The physical node that the LIP filter's builder is 
attached to.
+   */
+  void addProbeInfo(const expressions::AttributeReferencePtr &probe_attribute,
+                    const PhysicalPtr &prober,
+                    const expressions::AttributeReferencePtr &build_attribute,
+                    const PhysicalPtr &builder) {
+    probe_info_map_[prober].emplace_back(
+        probe_attribute, build_attribute, builder);
+  }
+
+  /**
+   * @brief Get all the LIP filter builders.
+   *
+   * @return A map where each key is a physical node and each mapped value is
+   *         a vector of all the LIP filter builders that are attached to the
+   *         physical node.
+   */
+  const std::map<PhysicalPtr, std::vector<LIPFilterBuildInfo>>& 
getBuildInfoMap() const {
+    return build_info_map_;
+  }
+
+  /**
+   * @brief Get all the LIP filter probers.
+   *
+   * @return A map where each key is a physical node and each mapped value is
+   *         a vector of all the LIP filter probers that are attached to the
+   *         physical node.
+   */
+  const std::map<PhysicalPtr, std::vector<LIPFilterProbeInfo>>& 
getProbeInfoMap() const {
+    return probe_info_map_;
+  }
+
+ private:
+  std::map<PhysicalPtr, std::vector<LIPFilterBuildInfo>> build_info_map_;
+  std::map<PhysicalPtr, std::vector<LIPFilterProbeInfo>> probe_info_map_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterConfiguration);
+};
+
+/** @} */
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_LIP_FILTER_CONFIGURATION_HPP_ */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/physical/TopLevelPlan.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TopLevelPlan.hpp 
b/query_optimizer/physical/TopLevelPlan.hpp
index 8f07dec..7dfc2b6 100644
--- a/query_optimizer/physical/TopLevelPlan.hpp
+++ b/query_optimizer/physical/TopLevelPlan.hpp
@@ -29,6 +29,7 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "utility/Macros.hpp"
@@ -89,6 +90,29 @@ class TopLevelPlan : public Physical {
     return shared_subplans_[index];
   }
 
+  /**
+   * @brief Creates a copy of the TopLevelPlan with lip_filter_configuration_
+   *        replaced by \p new_lip_filter_configuration.
+   *
+   * @param new_lip_filter_configuration The new lip_filter_configuration to be
+   *        substituted for the existing one.
+   * @return A copy of this TopLevelPlan with the new lip_filter_configuration.
+   */
+  TopLevelPlanPtr copyWithLIPFilterConfiguration(
+      const LIPFilterConfigurationPtr &new_lip_filter_configuration) const {
+    return TopLevelPlan::Create(plan_,
+                                shared_subplans_,
+                                uncorrelated_subquery_map_,
+                                new_lip_filter_configuration);
+  }
+
+  /**
+   * @return The LIPFilter configuration information for the overall query 
plan.
+   */
+  const LIPFilterConfigurationPtr& lip_filter_configuration() const {
+    return lip_filter_configuration_;
+  }
+
   PhysicalPtr copyWithNewChildren(
       const std::vector<PhysicalPtr> &new_children) const override {
     DCHECK_EQ(getNumChildren(), new_children.size());
@@ -125,18 +149,22 @@ class TopLevelPlan : public Physical {
    *
    * @param plan The query plan.
    * @param shared_subplans The subplans referenced in the main input plan.
-   * @param Map from the expression ID of an attribute reference to the
-   *        uncorrelated subquery that produces the attribute.
+   * @param uncorrelated_subquery_map Map from the expression ID of an 
attribute
+   *        reference to the uncorrelated subquery that produces the attribute.
+   * @param lip_filter_configuration The LIPFilter configuration information
+   *        for the overall query plan.
    * @return An immutable TopLevelPlan.
    */
   static TopLevelPlanPtr Create(
       const PhysicalPtr &plan,
       const std::vector<PhysicalPtr> &shared_subplans = {},
       const std::unordered_map<expressions::ExprId, int> 
&uncorrelated_subquery_map
-          = std::unordered_map<expressions::ExprId, int>()) {
+          = std::unordered_map<expressions::ExprId, int>(),
+      const LIPFilterConfigurationPtr &lip_filter_configuration = nullptr) {
     return TopLevelPlanPtr(new TopLevelPlan(plan,
                                             shared_subplans,
-                                            uncorrelated_subquery_map));
+                                            uncorrelated_subquery_map,
+                                            lip_filter_configuration));
   }
 
  protected:
@@ -151,10 +179,12 @@ class TopLevelPlan : public Physical {
  private:
   TopLevelPlan(const PhysicalPtr &plan,
                const std::vector<PhysicalPtr> &shared_subplans,
-               const std::unordered_map<expressions::ExprId, int> 
&uncorrelated_subquery_map)
+               const std::unordered_map<expressions::ExprId, int> 
&uncorrelated_subquery_map,
+               const LIPFilterConfigurationPtr &lip_filter_configuration)
       : plan_(plan),
         shared_subplans_(shared_subplans),
-        uncorrelated_subquery_map_(uncorrelated_subquery_map) {
+        uncorrelated_subquery_map_(uncorrelated_subquery_map),
+        lip_filter_configuration_(lip_filter_configuration) {
     addChild(plan);
     for (const PhysicalPtr &shared_subplan : shared_subplans) {
       addChild(shared_subplan);
@@ -165,6 +195,7 @@ class TopLevelPlan : public Physical {
   // Stored in the topological ordering based on dependencies.
   std::vector<PhysicalPtr> shared_subplans_;
   std::unordered_map<expressions::ExprId, int> uncorrelated_subquery_map_;
+  LIPFilterConfigurationPtr lip_filter_configuration_;
 
   DISALLOW_COPY_AND_ASSIGN(TopLevelPlan);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/rules/AttachLIPFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachLIPFilters.cpp 
b/query_optimizer/rules/AttachLIPFilters.cpp
new file mode 100644
index 0000000..090fb8c
--- /dev/null
+++ b/query_optimizer/rules/AttachLIPFilters.cpp
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/AttachLIPFilters.hpp"
+
+#include <map>
+#include <set>
+#include <unordered_set>
+#include <unordered_map>
+#include <vector>
+#include <utility>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr AttachLIPFilters::apply(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  const P::TopLevelPlanPtr top_level_plan =
+     std::static_pointer_cast<const P::TopLevelPlan>(input);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(
+          top_level_plan->shared_subplans()));
+  lip_filter_configuration_.reset(new P::LIPFilterConfiguration());
+
+  std::set<E::ExprId> already_filtered_attributes;
+  attachLIPFilters(NodeList(input), &already_filtered_attributes);
+
+  P::PhysicalPtr output;
+  if (!lip_filter_configuration_->getBuildInfoMap().empty() ||
+      !lip_filter_configuration_->getProbeInfoMap().empty()) {
+    output = top_level_plan->copyWithLIPFilterConfiguration(
+        P::LIPFilterConfigurationPtr(lip_filter_configuration_.release()));
+  } else {
+    output = input;
+  }
+  return output;
+}
+
+void AttachLIPFilters::attachLIPFilters(
+    const NodeList &path,
+    std::set<expressions::ExprId> *already_filtered_attributes) {
+  const P::PhysicalPtr &node = path.node;
+
+  // First process child nodes
+  for (const auto &child : node->children()) {
+    std::set<E::ExprId> child_filtered_attributes;
+    attachLIPFilters(path.cons(child), &child_filtered_attributes);
+    already_filtered_attributes->insert(child_filtered_attributes.begin(),
+                                        child_filtered_attributes.end());
+  }
+
+  // Attach LIP filters to HashJoin/Selection/Aggregate nodes
+  P::PhysicalPtr probe_child = nullptr;
+  switch (node->getPhysicalType()) {
+    case P::PhysicalType::kHashJoin:
+      probe_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
+      break;
+    case P::PhysicalType::kSelection:
+      probe_child = std::static_pointer_cast<const 
P::Selection>(node)->input();
+      break;
+    case P::PhysicalType::kAggregate:
+      probe_child = std::static_pointer_cast<const 
P::Aggregate>(node)->input();
+      break;
+    default:
+      break;
+  }
+
+  if (probe_child != nullptr &&
+      cost_model_->estimateCardinality(probe_child) > 10000000) {
+    const auto &candidate_lip_filters = 
getProbeSideInfo(path.cons(probe_child));
+    if (!candidate_lip_filters.empty()) {
+      std::map<E::AttributeReferencePtr, LIPFilterInfoPtr> selected_filters;
+      for (const auto &info : candidate_lip_filters) {
+        auto it = selected_filters.find(info->attribute);
+        if (it == selected_filters.end()) {
+          selected_filters.emplace(info->attribute, info);
+        } else if (LIPFilterInfo::isBetterThan(*info, *it->second)) {
+          it->second = info;
+        }
+      }
+
+      for (const auto &pair : selected_filters) {
+        const E::ExprId source_attr_id = pair.second->source_attribute->id();
+        if (already_filtered_attributes->find(source_attr_id)
+                == already_filtered_attributes->end()) {
+          lip_filter_configuration_->addBuildInfo(
+              pair.second->source_attribute,
+              pair.second->source,
+              pair.second->estimated_cardinality * 8,
+              LIPFilterType::kSingleIdentityHashFilter);
+          lip_filter_configuration_->addProbeInfo(
+              pair.first,
+              node,
+              pair.second->source_attribute,
+              pair.second->source);
+          already_filtered_attributes->emplace(source_attr_id);
+        }
+      }
+    }
+  }
+}
+
+const std::vector<AttachLIPFilters::LIPFilterInfoPtr>& AttachLIPFilters
+    ::getBuildSideInfo(const NodeList &path) {
+  const P::PhysicalPtr &node = path.node;
+  if (build_side_info_.find(node) == build_side_info_.end()) {
+    std::vector<LIPFilterInfoPtr> lip_filters;
+
+    // 1. Gather candidate LIP filters propagated from descendant nodes.
+    std::unordered_set<E::ExprId> output_attribute_ids;
+    for (const auto &attr : node->getOutputAttributes()) {
+      output_attribute_ids.emplace(attr->id());
+    }
+    switch (node->getPhysicalType()) {
+      case P::PhysicalType::kAggregate:
+      case P::PhysicalType::kSelection:
+      case P::PhysicalType::kHashJoin: {
+        for (const P::PhysicalPtr &child : node->children()) {
+          for (const LIPFilterInfoPtr &info : 
getBuildSideInfo(path.cons(child))) {
+            lip_filters.emplace_back(info);
+          }
+        }
+        break;
+      }
+      default:
+        break;
+    }
+
+    // 2. Consider the parent physical node. If it is a HashJoin,
+    // then each build-side join attribute is a candidate LIP filter
+    // which can be built by the BuildHashOperator that corresponds
+    // to the parent HashJoin node.
+    P::HashJoinPtr hash_join;
+    if (path.cdr() != nullptr &&
+        P::SomeHashJoin::MatchesWithConditionalCast(path.cdr()->node, 
&hash_join)) {
+      const P::PhysicalPtr &build_node = hash_join->right();
+      // TODO(jianqiao): consider probe-side info to allow cascading 
propagation.
+      double selectivity = cost_model_->estimateSelectivity(build_node);
+      // Only consider attributes that are selective.
+      if (selectivity < 1.0) {
+        std::size_t cardinality = cost_model_->estimateCardinality(build_node);
+        for (const auto &attr : hash_join->right_join_attributes()) {
+          lip_filters.emplace_back(
+              std::make_shared<LIPFilterInfo>(attr,
+                                              path.cdr()->node,
+                                              path.depth,
+                                              selectivity,
+                                              cardinality));
+        }
+      }
+    }
+    build_side_info_.emplace(node, std::move(lip_filters));
+  }
+  return build_side_info_.at(node);
+}
+
+const std::vector<AttachLIPFilters::LIPFilterInfoPtr>& AttachLIPFilters
+    ::getProbeSideInfo(const NodeList &path) {
+  const P::PhysicalPtr &node = path.node;
+  if (probe_side_info_.find(node) == probe_side_info_.end()) {
+    std::vector<LIPFilterInfoPtr> lip_filters;
+    if (path.cdr() != nullptr) {
+      // 1. Gather candidate LIP filters propagated from ancestor nodes.
+      const auto &parent_lip_filters = getProbeSideInfo(*path.cdr());
+      if (!parent_lip_filters.empty()) {
+        std::unordered_set<E::ExprId> output_attribute_ids;
+        for (const auto &attr : node->getOutputAttributes()) {
+          output_attribute_ids.emplace(attr->id());
+        }
+        for (const auto &info : parent_lip_filters) {
+          if (output_attribute_ids.find(info->attribute->id()) != 
output_attribute_ids.end()) {
+            lip_filters.emplace_back(info);
+          }
+        }
+      }
+
+      // 2. Consider the parent physical node. If it is an InnerHashJoin or
+      // LeftSemiHashJoin, then we can propagate the build-side LIP filters
+      // to the probe-side.
+      P::HashJoinPtr hash_join;
+      if (P::SomeHashJoin::MatchesWithConditionalCast(path.cdr()->node, 
&hash_join) &&
+          (hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin ||
+           hash_join->join_type() == P::HashJoin::JoinType::kLeftSemiJoin)) {
+        const P::PhysicalPtr &build_side_child = hash_join->right();
+        std::unordered_map<E::ExprId, E::AttributeReferencePtr> 
join_attribute_pairs;
+        for (std::size_t i = 0; i < hash_join->left_join_attributes().size(); 
++i) {
+          const E::AttributeReferencePtr probe_side_join_attribute =
+              hash_join->left_join_attributes()[i];
+          const E::AttributeReferencePtr build_side_join_attribute =
+              hash_join->right_join_attributes()[i];
+          join_attribute_pairs.emplace(build_side_join_attribute->id(),
+                                       probe_side_join_attribute);
+        }
+        for (const auto &info : 
getBuildSideInfo(path.cdr()->cons(build_side_child))) {
+          const auto pair_it = 
join_attribute_pairs.find(info->attribute->id());
+          if (pair_it != join_attribute_pairs.end()) {
+            lip_filters.emplace_back(
+                std::make_shared<LIPFilterInfo>(pair_it->second,
+                                                info->source,
+                                                info->depth,
+                                                info->estimated_selectivity,
+                                                info->estimated_cardinality,
+                                                info->attribute));
+          }
+        }
+      }
+    }
+    probe_side_info_.emplace(node, std::move(lip_filters));
+  }
+  return probe_side_info_.at(node);
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/rules/AttachLIPFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachLIPFilters.hpp 
b/query_optimizer/rules/AttachLIPFilters.hpp
new file mode 100644
index 0000000..b8cfc39
--- /dev/null
+++ b/query_optimizer/rules/AttachLIPFilters.hpp
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_LIP_FILTERS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_LIP_FILTERS_HPP_
+
+#include <cstddef>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to attach LIPFilters.
+ */
+class AttachLIPFilters : public Rule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  AttachLIPFilters() {}
+
+  ~AttachLIPFilters() override {}
+
+  std::string getName() const override {
+    return "AttachLIPFilters";
+  }
+
+  physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+  /**
+   * @brief Internal data structure for passing around LIPFilter information.
+   */
+  struct LIPFilterInfo {
+    LIPFilterInfo(const expressions::AttributeReferencePtr &attribute_in,
+                  const physical::PhysicalPtr &source_in,
+                  const int depth_in,
+                  const double estimated_selectivity_in,
+                  const std::size_t estimated_cardinality_in,
+                  const expressions::AttributeReferencePtr 
&source_attribute_in = nullptr)
+        : attribute(attribute_in),
+          source(source_in),
+          depth(depth_in),
+          estimated_selectivity(estimated_selectivity_in),
+          estimated_cardinality(estimated_cardinality_in),
+          source_attribute(
+              source_attribute_in == nullptr
+                  ? attribute_in
+                  : source_attribute_in) {}
+
+    static bool isBetterThan(const LIPFilterInfo &a, const LIPFilterInfo &b) {
+      if (a.estimated_selectivity == b.estimated_selectivity) {
+        return a.depth > b.depth;
+      } else {
+        return a.estimated_selectivity < b.estimated_selectivity;
+      }
+    }
+
+    expressions::AttributeReferencePtr attribute;
+    physical::PhysicalPtr source;
+    int depth;
+    double estimated_selectivity;
+    std::size_t estimated_cardinality;
+    expressions::AttributeReferencePtr source_attribute;
+  };
+
+  typedef std::shared_ptr<const LIPFilterInfo> LIPFilterInfoPtr;
+
+  /**
+   * @brief Functional list data structure for internal use.
+   */
+  struct NodeList {
+    explicit NodeList(const physical::PhysicalPtr &node_in)
+        : node(node_in),
+          next(nullptr),
+          depth(0) {}
+
+    NodeList(const physical::PhysicalPtr &node_in,
+             const NodeList *next_in,
+             const int depth_in)
+        : node(node_in),
+          next(next_in),
+          depth(depth_in) {}
+
+    inline const NodeList *cdr() const {
+      return next;
+    }
+
+    inline const NodeList cons(const physical::PhysicalPtr &new_node) const {
+      return NodeList(new_node, this, depth+1);
+    }
+
+    const physical::PhysicalPtr node;
+    const NodeList *next;
+    const int depth;
+  };
+
+  void attachLIPFilters(const NodeList &path,
+                        std::set<expressions::ExprId> 
*already_filtered_attributes);
+
+  const std::vector<LIPFilterInfoPtr>& getBuildSideInfo(const NodeList &path);
+
+  const std::vector<LIPFilterInfoPtr>& getProbeSideInfo(const NodeList &path);
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+  std::map<physical::PhysicalPtr, std::vector<LIPFilterInfoPtr>> 
build_side_info_;
+  std::map<physical::PhysicalPtr, std::vector<LIPFilterInfoPtr>> 
probe_side_info_;
+  std::unique_ptr<physical::LIPFilterConfiguration> lip_filter_configuration_;
+
+  DISALLOW_COPY_AND_ASSIGN(AttachLIPFilters);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_LIP_FILTERS_HPP_ */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt 
b/query_optimizer/rules/CMakeLists.txt
index d9709ce..29875f6 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -18,6 +18,7 @@
 add_subdirectory(tests)
 
 # Declare micro-libs:
+add_library(quickstep_queryoptimizer_rules_AttachLIPFilters 
AttachLIPFilters.cpp AttachLIPFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp 
BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp 
CollapseProject.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp 
GenerateJoins.hpp)
@@ -36,6 +37,21 @@ add_library(quickstep_queryoptimizer_rules_UnnestSubqueries 
UnnestSubqueries.cpp
 
 
 # Link dependencies:
+target_link_libraries(quickstep_queryoptimizer_rules_AttachLIPFilters
+                      
quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_LIPFilterConfiguration
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
 target_link_libraries(quickstep_queryoptimizer_rules_BottomUpRule
                       glog
                       quickstep_queryoptimizer_rules_Rule
@@ -121,12 +137,14 @@ 
target_link_libraries(quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOpti
                       quickstep_queryoptimizer_expressions_NamedExpression
                       quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_Aggregate
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_PatternMatcher
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_DisjointTreeForest
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_SwapProbeBuild
                       quickstep_queryoptimizer_costmodel_SimpleCostModel
@@ -187,6 +205,7 @@ 
target_link_libraries(quickstep_queryoptimizer_rules_UpdateExpression
 # Module all-in-one library:
 add_library(quickstep_queryoptimizer_rules ../../empty_src.cpp 
OptimizerRulesModule.hpp)
 target_link_libraries(quickstep_queryoptimizer_rules
+                      quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_GenerateJoins

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp 
b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
index 946d316..5906b98 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
@@ -19,6 +19,8 @@
 
 #include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp"
 
+#include <algorithm>
+#include <map>
 #include <memory>
 #include <set>
 #include <unordered_map>
@@ -28,11 +30,13 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/PatternMatcher.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "utility/DisjointTreeForest.hpp"
 
 #include "glog/logging.h"
 
@@ -74,6 +78,9 @@ P::PhysicalPtr 
StarSchemaHashJoinOrderOptimization::applyInternal(const P::Physi
     JoinGroupInfo *join_group = nullptr;
     if (parent_join_group == nullptr || !is_valid_cascading_hash_join) {
       new_join_group.reset(new JoinGroupInfo());
+      for (const auto &attr : input->getOutputAttributes()) {
+        new_join_group->referenced_attributes.emplace(attr->id());
+      }
       join_group = new_join_group.get();
     } else {
       join_group = parent_join_group;
@@ -146,7 +153,10 @@ physical::PhysicalPtr 
StarSchemaHashJoinOrderOptimization::generatePlan(
         i,
         tables[i],
         cost_model_->estimateCardinality(tables[i]),
-        cost_model_->estimateSelectivity(tables[i]));
+        cost_model_->estimateSelectivity(tables[i]),
+        CountSharedAttributes(join_group.referenced_attributes,
+                              tables[i]->getOutputAttributes()),
+        tables[i]->getPhysicalType() == physical::PhysicalType::kAggregate);
   }
 
   // Auxiliary mapping info.
@@ -163,9 +173,25 @@ physical::PhysicalPtr 
StarSchemaHashJoinOrderOptimization::generatePlan(
     }
   }
 
-  // Create a join graph where tables are vertices, and add an edge between 
vertices
-  // t1 and t2 for each join predicate t1.x = t2.y
-  std::vector<std::unordered_set<std::size_t>> 
join_graph(table_info_storage.size());
+  // The pool of tables.
+  std::set<TableInfo*> remaining_tables;
+  for (auto &table_info : table_info_storage) {
+    remaining_tables.emplace(&table_info);
+  }
+
+  // The equal-join (e.g. =) operator defines an equivalence relation on the
+  // set of all the attributes. The disjoint set data structure is used to keep
+  // track of the equivalence classes that each attribute belongs to.
+  DisjointTreeForest<E::ExprId> join_attribute_forest;
+  for (const auto &attr_id_pair : join_group.join_attribute_pairs) {
+    join_attribute_forest.makeSet(attr_id_pair.first);
+    join_attribute_forest.makeSet(attr_id_pair.second);
+    join_attribute_forest.merge(attr_id_pair.first, attr_id_pair.second);
+  }
+
+  // Map each equivalence class id to the members (e.g. <table id, attribute 
id>
+  // pairs) in that equivalence class.
+  std::map<std::size_t, std::map<std::size_t, E::ExprId>> 
join_attribute_groups;
   for (const auto &attr_id_pair : join_group.join_attribute_pairs) {
     DCHECK(attribute_id_to_table_info_index_map.find(attr_id_pair.first)
                != attribute_id_to_table_info_index_map.end());
@@ -178,128 +204,148 @@ physical::PhysicalPtr 
StarSchemaHashJoinOrderOptimization::generatePlan(
         attribute_id_to_table_info_index_map[attr_id_pair.second];
     DCHECK_NE(first_table_idx, second_table_idx);
 
-    table_info_storage[first_table_idx].join_attribute_pairs.emplace(
-        attr_id_pair.first, attr_id_pair.second);
-    table_info_storage[second_table_idx].join_attribute_pairs.emplace(
-        attr_id_pair.second, attr_id_pair.first);
-
-    join_graph[first_table_idx].emplace(second_table_idx);
-    join_graph[second_table_idx].emplace(first_table_idx);
-  }
-
-  std::set<TableInfo*, TableInfoPtrLessComparator> 
table_info_ordered_by_priority;
-  for (std::size_t i = 0; i < table_info_storage.size(); ++i) {
-    table_info_ordered_by_priority.emplace(&table_info_storage[i]);
+    DCHECK_EQ(join_attribute_forest.find(attr_id_pair.first),
+              join_attribute_forest.find(attr_id_pair.second));
+    const std::size_t attr_group_id = 
join_attribute_forest.find(attr_id_pair.first);
+    auto &attr_group = join_attribute_groups[attr_group_id];
+    attr_group.emplace(first_table_idx, attr_id_pair.first);
+    attr_group.emplace(second_table_idx, attr_id_pair.second);
   }
 
-  // Contruct hash join tree.
   while (true) {
-    TableInfo *first_table_info = *table_info_ordered_by_priority.begin();
-    table_info_ordered_by_priority.erase(
-        table_info_ordered_by_priority.begin());
-    const std::size_t first_table_info_id = first_table_info->table_info_id;
-
-    TableInfo *second_table_info = nullptr;
-    std::set<TableInfo*, TableInfoPtrLessComparator>::iterator 
second_table_info_it;
-    for (auto candidate_table_info_it = table_info_ordered_by_priority.begin();
-         candidate_table_info_it != table_info_ordered_by_priority.end();
-         ++candidate_table_info_it) {
-      TableInfo *candidate_table_info = *candidate_table_info_it;
-      const std::size_t candidate_table_info_id = 
candidate_table_info->table_info_id;
-
-      if (join_graph[first_table_info_id].find(candidate_table_info_id)
-              == join_graph[first_table_info_id].end() &&
-          join_graph[candidate_table_info_id].find(first_table_info_id)
-              == join_graph[candidate_table_info_id].end()) {
-        continue;
-      } else if (second_table_info == nullptr) {
-        second_table_info = candidate_table_info;
-        second_table_info_it = candidate_table_info_it;
-      }
-
-      bool is_likely_many_to_many_join = false;
-      for (const auto join_attr_pair : first_table_info->join_attribute_pairs) 
{
-        if 
(candidate_table_info->joined_attribute_set.find(join_attr_pair.second)
-                != candidate_table_info->joined_attribute_set.end()) {
-          is_likely_many_to_many_join = true;
-          break;
+    // Find the best probe/build pair out of the remaining tables.
+    // TODO(jianqiao): design better data structure to improve efficiency here.
+    std::unique_ptr<JoinPair> best_join = nullptr;
+    for (TableInfo *probe_table_info : remaining_tables) {
+      for (TableInfo *build_table_info : remaining_tables) {
+        if (probe_table_info != build_table_info) {
+          const std::size_t probe_table_id = probe_table_info->table_info_id;
+          const std::size_t build_table_id = build_table_info->table_info_id;
+          std::size_t num_join_attributes = 0;
+          double build_side_uniqueness = 1.0;
+          for (const auto &attr_group_pair : join_attribute_groups) {
+            const auto &attr_group = attr_group_pair.second;
+            auto probe_it = attr_group.find(probe_table_id);
+            auto build_it = attr_group.find(build_table_id);
+            if (probe_it != attr_group.end() && build_it != attr_group.end()) {
+              ++num_join_attributes;
+              build_side_uniqueness *= std::max(
+                  1uL,
+                  cost_model_->estimateNumDistinctValues(
+                      build_it->second, build_table_info->table));
+            }
+          }
+          build_side_uniqueness /= build_table_info->estimated_cardinality;
+
+          if (num_join_attributes > 0) {
+            std::unique_ptr<JoinPair> new_join(
+                new JoinPair(probe_table_info,
+                             build_table_info,
+                             build_side_uniqueness >= 0.9,
+                             num_join_attributes));
+            if (best_join == nullptr || new_join->isBetterThan(*best_join)) {
+              best_join.reset(new_join.release());
+            }
+          }
         }
       }
-      for (const auto join_attr_pair : 
candidate_table_info->join_attribute_pairs) {
-        if (first_table_info->joined_attribute_set.find(join_attr_pair.second)
-                != first_table_info->joined_attribute_set.end()) {
-          is_likely_many_to_many_join = true;
-          break;
-        }
-      }
-      if (!is_likely_many_to_many_join) {
-        second_table_info = candidate_table_info;
-        second_table_info_it = candidate_table_info_it;
-        break;
+    }
+
+    CHECK(best_join != nullptr);
+
+    TableInfo *selected_probe_table_info = best_join->probe;
+    TableInfo *selected_build_table_info = best_join->build;
+
+    // Swap probe/build sides if:
+    // (1) Build side is an aggregation with large number of groups, so that
+    //     there is a change to push LIPFilters down the aggregation.
+    // (2) Build side's join attributes are not unique, and it has larger
+    //     cardinality than the probe side.
+    const std::size_t probe_num_groups_as_agg =
+        getEstimatedNumGroups(selected_probe_table_info->table);
+    const std::size_t build_num_groups_as_agg =
+        getEstimatedNumGroups(selected_build_table_info->table);
+    if (build_num_groups_as_agg > 1000000 || probe_num_groups_as_agg > 
1000000) {
+      if (build_num_groups_as_agg > probe_num_groups_as_agg) {
+        std::swap(selected_probe_table_info, selected_build_table_info);
       }
+    } else if ((!best_join->build_side_unique || 
best_join->num_join_attributes > 1) &&
+        selected_probe_table_info->estimated_cardinality < 
selected_build_table_info->estimated_cardinality) {
+      std::swap(selected_probe_table_info, selected_build_table_info);
     }
-    DCHECK(second_table_info != nullptr);
-    table_info_ordered_by_priority.erase(second_table_info_it);
 
-    const P::PhysicalPtr &left_child = first_table_info->table;
-    const P::PhysicalPtr &right_child = second_table_info->table;
+    remaining_tables.erase(selected_probe_table_info);
+    remaining_tables.erase(selected_build_table_info);
+
+    // Figure out the output attributes.
+    const P::PhysicalPtr &probe_child = selected_probe_table_info->table;
+    const P::PhysicalPtr &build_child = selected_build_table_info->table;
     std::vector<E::NamedExpressionPtr> output_attributes;
-    for (const E::AttributeReferencePtr &left_attr : 
left_child->getOutputAttributes()) {
-      output_attributes.emplace_back(left_attr);
+    for (const E::AttributeReferencePtr &probe_attr : 
probe_child->getOutputAttributes()) {
+      output_attributes.emplace_back(probe_attr);
     }
-    for (const E::AttributeReferencePtr &right_attr : 
right_child->getOutputAttributes()) {
-      output_attributes.emplace_back(right_attr);
+    for (const E::AttributeReferencePtr &build_attr : 
build_child->getOutputAttributes()) {
+      output_attributes.emplace_back(build_attr);
     }
 
-    std::vector<E::AttributeReferencePtr> left_join_attributes;
-    std::vector<E::AttributeReferencePtr> right_join_attributes;
-    std::unordered_set<expressions::ExprId> new_joined_attribute_set;
-    for (const auto &join_attr_pair : first_table_info->join_attribute_pairs) {
-      if (second_table_info->join_attribute_pairs.find(join_attr_pair.second)
-              != second_table_info->join_attribute_pairs.end()) {
-        left_join_attributes.emplace_back(
-            attribute_id_to_reference_map[join_attr_pair.first]);
-        right_join_attributes.emplace_back(
-            attribute_id_to_reference_map[join_attr_pair.second]);
-
-        new_joined_attribute_set.emplace(join_attr_pair.first);
-        new_joined_attribute_set.emplace(join_attr_pair.second);
+    // Figure out the join attributes.
+    std::vector<E::AttributeReferencePtr> probe_attributes;
+    std::vector<E::AttributeReferencePtr> build_attributes;
+    const std::size_t probe_table_id = 
selected_probe_table_info->table_info_id;
+    const std::size_t build_table_id = 
selected_build_table_info->table_info_id;
+    for (const auto &attr_group_pair : join_attribute_groups) {
+      const auto &attr_group = attr_group_pair.second;
+      auto probe_it = attr_group.find(probe_table_id);
+      auto build_it = attr_group.find(build_table_id);
+      if (probe_it != attr_group.end() && build_it != attr_group.end()) {
+        probe_attributes.emplace_back(
+            attribute_id_to_reference_map.at(probe_it->second));
+        build_attributes.emplace_back(
+            attribute_id_to_reference_map.at(build_it->second));
       }
     }
-    DCHECK_GE(left_join_attributes.size(), static_cast<std::size_t>(1));
 
-    if (table_info_ordered_by_priority.size() > 0) {
+    // Create a hash join from the choosen probe/build pair and put it back to
+    // the table pool. Return the last table in the table pool if there is only
+    // one table left.
+    if (remaining_tables.size() > 0) {
       P::PhysicalPtr output =
-          P::HashJoin::Create(left_child,
-                              right_child,
-                              left_join_attributes,
-                              right_join_attributes,
+          P::HashJoin::Create(probe_child,
+                              build_child,
+                              probe_attributes,
+                              build_attributes,
                               nullptr,
                               output_attributes,
                               P::HashJoin::JoinType::kInnerJoin);
 
-      second_table_info->table = output;
+      selected_probe_table_info->table = output;
 
       // TODO(jianqiao): Cache the estimated cardinality for each plan in cost
       // model to avoid duplicated estimation.
-      second_table_info->estimated_cardinality = 
cost_model_->estimateCardinality(output);
-
-      
second_table_info->join_attribute_pairs.insert(first_table_info->join_attribute_pairs.begin(),
-                                                     
first_table_info->join_attribute_pairs.end());
-      
second_table_info->joined_attribute_set.insert(first_table_info->joined_attribute_set.begin(),
-                                                     
first_table_info->joined_attribute_set.end());
-      
second_table_info->joined_attribute_set.insert(new_joined_attribute_set.begin(),
-                                                     
new_joined_attribute_set.end());
-      table_info_ordered_by_priority.emplace(second_table_info);
-
-      
join_graph[second_table_info->table_info_id].insert(join_graph[first_table_info_id].begin(),
-                                                          
join_graph[first_table_info_id].end());
-
+      selected_probe_table_info->estimated_cardinality = 
cost_model_->estimateCardinality(output);
+      selected_probe_table_info->estimated_selectivity = 
cost_model_->estimateSelectivity(output);
+
+      selected_probe_table_info->estimated_num_output_attributes =
+          CountSharedAttributes(join_group.referenced_attributes,
+                                output->getOutputAttributes());
+
+      remaining_tables.emplace(selected_probe_table_info);
+
+      // Update join attribute groups.
+      for (auto &attr_group_pair : join_attribute_groups) {
+        auto &attr_group = attr_group_pair.second;
+        auto build_it = attr_group.find(build_table_id);
+        if (build_it != attr_group.end()) {
+          const E::ExprId attr_id = build_it->second;
+          attr_group.erase(build_it);
+          attr_group.emplace(probe_table_id, attr_id);
+        }
+      }
     } else {
-      return P::HashJoin::Create(left_child,
-                                 right_child,
-                                 left_join_attributes,
-                                 right_join_attributes,
+      return P::HashJoin::Create(probe_child,
+                                 build_child,
+                                 probe_attributes,
+                                 build_attributes,
                                  residual_predicate,
                                  project_expressions,
                                  P::HashJoin::JoinType::kInnerJoin);
@@ -307,5 +353,28 @@ physical::PhysicalPtr 
StarSchemaHashJoinOrderOptimization::generatePlan(
   }
 }
 
+std::size_t StarSchemaHashJoinOrderOptimization::CountSharedAttributes(
+    const std::unordered_set<expressions::ExprId> &attr_set1,
+    const std::vector<expressions::AttributeReferencePtr> &attr_set2) {
+  std::size_t cnt = 0;
+  for (const auto &attr : attr_set2) {
+    if (attr_set1.find(attr->id()) != attr_set1.end()) {
+      ++cnt;
+    }
+  }
+  return cnt;
+}
+
+std::size_t StarSchemaHashJoinOrderOptimization::getEstimatedNumGroups(
+    const physical::PhysicalPtr &input) {
+  P::AggregatePtr aggregate;
+  if (P::SomeAggregate::MatchesWithConditionalCast(input, &aggregate)) {
+    return cost_model_->estimateNumGroupsForAggregate(aggregate);
+  } else {
+    return 0;
+  }
+}
+
+
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp 
b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
index c1a7bae..64e2478 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
@@ -20,16 +20,15 @@
 #ifndef 
QUICKSTEP_QUERY_OPTIMIZER_RULES_STAR_SCHEMA_HASH_JOIN_ORDER_OPTIMIZATION_HPP_
 #define 
QUICKSTEP_QUERY_OPTIMIZER_RULES_STAR_SCHEMA_HASH_JOIN_ORDER_OPTIMIZATION_HPP_
 
-#include <algorithm>
 #include <cstddef>
 #include <memory>
 #include <string>
-#include <unordered_map>
 #include <unordered_set>
 #include <utility>
 #include <vector>
 
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
@@ -45,7 +44,11 @@ namespace optimizer {
  */
 
 /**
- * @brief TODO
+ * @brief Rule that applies to a physical plan to optimize hash join orders.
+ *
+ * This optimization applies a greedy algorithm to favor smaller cardinality
+ * and selective tables to be joined first, which is suitable for queries on
+ * star-schema or snowflake-schema tables.
  */
 class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
  public:
@@ -64,6 +67,7 @@ class StarSchemaHashJoinOrderOptimization : public 
Rule<physical::Physical> {
    * @brief A group of tables to form a hash join tree.
    */
   struct JoinGroupInfo {
+    std::unordered_set<expressions::ExprId> referenced_attributes;
     std::vector<physical::PhysicalPtr> tables;
     std::vector<std::pair<expressions::ExprId, expressions::ExprId>> 
join_attribute_pairs;
   };
@@ -72,49 +76,91 @@ class StarSchemaHashJoinOrderOptimization : public 
Rule<physical::Physical> {
    * @brief Auxiliary information of a table for the optimizer.
    */
   struct TableInfo {
-    TableInfo(const std::size_t in_table_info_id,
-              const physical::PhysicalPtr &in_table,
-              const std::size_t in_estimated_cardinality,
-              const double in_estimated_selectivity)
-        : table_info_id(in_table_info_id),
-          table(in_table),
-          estimated_cardinality(in_estimated_cardinality),
-          estimated_selectivity(in_estimated_selectivity) {
+    TableInfo(const std::size_t table_info_id_in,
+              const physical::PhysicalPtr &table_in,
+              const std::size_t estimated_cardinality_in,
+              const double estimated_selectivity_in,
+              const std::size_t estimated_num_output_attributes_in,
+              const bool is_aggregation_in)
+        : table_info_id(table_info_id_in),
+          table(table_in),
+          estimated_cardinality(estimated_cardinality_in),
+          estimated_selectivity(estimated_selectivity_in),
+          estimated_num_output_attributes(estimated_num_output_attributes_in) {
     }
 
     const std::size_t table_info_id;
     physical::PhysicalPtr table;
     std::size_t estimated_cardinality;
     double estimated_selectivity;
-    std::unordered_multimap<expressions::ExprId, expressions::ExprId> 
join_attribute_pairs;
-    std::unordered_set<expressions::ExprId> joined_attribute_set;
+    std::size_t estimated_num_output_attributes;
   };
 
-  /**
-   * @brief Comparator that compares the join priorities between two tables.
-   */
-  struct TableInfoPtrLessComparator {
-    inline bool operator() (const TableInfo *lhs, const TableInfo *rhs) {
-      bool swapped = false;
-      if (lhs->estimated_cardinality > rhs->estimated_cardinality) {
-        std::swap(lhs, rhs);
-        swapped = true;
+  struct JoinPair {
+    JoinPair(TableInfo *probe_in,
+             TableInfo *build_in,
+             const bool build_side_unique_in,
+             const std::size_t num_join_attributes_in)
+        : probe(probe_in),
+          build(build_in),
+          build_side_unique(build_side_unique_in),
+          num_join_attributes(num_join_attributes_in) {
+    }
+
+    inline bool isBetterThan(const JoinPair &rhs) const {
+      const auto &lhs = *this;
+
+      // Avoid carrying too many output attributes all the way through a long
+      // chain of hash joins.
+      const bool lhs_has_large_output =
+          lhs.build->estimated_num_output_attributes
+              + lhs.probe->estimated_num_output_attributes > 5;
+      const bool rhs_has_large_output =
+          rhs.build->estimated_num_output_attributes
+              + rhs.probe->estimated_num_output_attributes > 5;
+      if (lhs_has_large_output != rhs_has_large_output) {
+        return rhs_has_large_output;
+      }
+
+      // Prefer foreign-key primary-key style hash joins.
+      if (lhs.build_side_unique != rhs.build_side_unique) {
+        return lhs.build_side_unique;
+      }
+
+      // Prefer hash joins where the build side table is small.
+      const bool lhs_has_small_build = lhs.build->estimated_cardinality < 
0x100;
+      const bool rhs_has_small_build = rhs.build->estimated_cardinality < 
0x100;
+      if (lhs_has_small_build != rhs_has_small_build) {
+        return lhs_has_small_build;
       }
 
-      if (lhs->estimated_selectivity < rhs->estimated_selectivity) {
-        return !swapped;
-      } else if (lhs->estimated_cardinality < 100u &&
-                 rhs->estimated_cardinality > 10000u &&
-                 lhs->estimated_selectivity < rhs->estimated_selectivity * 
1.5) {
-        return !swapped;
-      } else if (lhs->estimated_selectivity > rhs->estimated_selectivity) {
-        return swapped;
-      } else if (lhs->estimated_cardinality != rhs->estimated_cardinality) {
-        return !swapped;
+      // Prefer hash joins where the probe side table is small. This is 
effective
+      // for TPCH style (snowflake schema) queries, with the help of 
LIPFilters.
+      if (lhs.probe->estimated_cardinality != 
rhs.probe->estimated_cardinality) {
+        return lhs.probe->estimated_cardinality < 
rhs.probe->estimated_cardinality;
+      }
+
+      // Prefer build side tables with better selectivity. This is effective
+      // for SSB style queries.
+      if (lhs.build->estimated_selectivity != 
rhs.build->estimated_selectivity) {
+        return lhs.build->estimated_selectivity < 
rhs.build->estimated_selectivity;
+      }
+
+      // Residual rules that help provide a total order.
+      if (lhs.build->estimated_cardinality != 
rhs.build->estimated_cardinality) {
+        return lhs.build->estimated_cardinality < 
rhs.build->estimated_cardinality;
+      }
+      if (lhs.probe->table != rhs.probe->table) {
+        return lhs.probe->table < rhs.probe->table;
       } else {
-        return swapped ^ (lhs->table < rhs->table);
+        return lhs.build->table < rhs.build->table;
       }
     }
+
+    TableInfo *probe;
+    TableInfo *build;
+    const bool build_side_unique;
+    const std::size_t num_join_attributes;
   };
 
   physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input,
@@ -125,6 +171,12 @@ class StarSchemaHashJoinOrderOptimization : public 
Rule<physical::Physical> {
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions);
 
+  std::size_t getEstimatedNumGroups(const physical::PhysicalPtr &input);
+
+  static std::size_t CountSharedAttributes(
+      const std::unordered_set<expressions::ExprId> &attr_set1,
+      const std::vector<expressions::AttributeReferencePtr> &attr_set2);
+
   std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
 
   DISALLOW_COPY_AND_ASSIGN(StarSchemaHashJoinOrderOptimization);


Reply via email to