Github user ioana-delaney commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15363#discussion_r104825321
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
    @@ -20,19 +20,342 @@ package org.apache.spark.sql.catalyst.optimizer
     import scala.annotation.tailrec
     
     import org.apache.spark.sql.catalyst.expressions._
    -import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
    +import org.apache.spark.sql.catalyst.planning.{BaseTableAccess, 
ExtractFiltersAndInnerJoins}
     import org.apache.spark.sql.catalyst.plans._
     import org.apache.spark.sql.catalyst.plans.logical._
     import org.apache.spark.sql.catalyst.rules._
    +import org.apache.spark.sql.catalyst.CatalystConf
    +
    +/**
    + * Encapsulates star-schema join detection.
    + */
    +case class DetectStarSchemaJoin(conf: CatalystConf) extends 
PredicateHelper {
    +
    +  /**
    +   * Star schema consists of one or more fact tables referencing a number 
of dimension
    +   * tables. In general, star-schema joins are detected using the 
following conditions:
    +   *  1. Informational RI constraints (reliable detection)
    +   *    + Dimension contains a primary key that is being joined to the 
fact table.
    +   *    + Fact table contains foreign keys referencing multiple dimension 
tables.
    +   *  2. Cardinality based heuristics
    +   *    + Usually, the table with the highest cardinality is the fact 
table.
    +   *    + Table being joined with the most number of tables is the fact 
table.
    +   *
    +   * To detect star joins, the algorithm uses a combination of the above 
two conditions.
    +   * The fact table is chosen based on the cardinality heuristics, and the 
dimension
    +   * tables are chosen based on the RI constraints. A star join will 
consist of the largest
    +   * fact table joined with the dimension tables on their primary keys. To 
detect that a
    +   * column is a primary key, the algorithm uses table and column 
statistics.
    +   *
    +   * Since Catalyst only supports left-deep tree plans, the algorithm 
currently returns only
    +   * the star join with the largest fact table. Choosing the largest fact 
table on the
    +   * driving arm to avoid large inners is in general a good heuristic. 
This restriction can
    +   * be lifted with support for bushy tree plans.
    +   *
    +   * The highlights of the algorithm are the following:
    +   *
    +   * Given a set of joined tables/plans, the algorithm first verifies if 
they are eligible
    +   * for star join detection. An eligible plan is a base table access with 
valid statistics.
    +   * A base table access represents Project or Filter operators above a 
LeafNode. Conservatively,
    +   * the algorithm only considers base table access as part of a star join 
since they provide
    +   * reliable statistics.
    +   *
    +   * If some of the plans are not base table access, or statistics are not 
available, the algorithm
    +   * falls back to the positional join reordering, since in the absence of 
statistics it cannot make
    +   * good planning decisions. Otherwise, the algorithm finds the table 
with the largest cardinality
    +   * (number of rows), which is assumed to be a fact table.
    +   *
    +   * Next, it computes the set of dimension tables for the current fact 
table. A dimension table
    +   * is assumed to be in a RI relationship with a fact table. To infer 
column uniqueness,
    +   * the algorithm compares the number of distinct values with the total 
number of rows in the
    +   * table. If their relative difference is within certain limits (i.e. 
ndvMaxError * 2, adjusted
    +   * based on tpcds data), the column is assumed to be unique.
    +   *
    +   * Given a star join, i.e. fact and dimension tables, the algorithm 
considers three cases:
    +   *
    +   * 1) The star join is an expanding join i.e. the fact table is joined 
using inequality
    +   * predicates or Cartesian product. In this case, the algorithm 
conservatively falls back
    +   * to the default join reordering since it cannot make good planning 
decisions in the absence
    +   * of the cost model.
    +   *
    +   * 2) The star join is a selective join. This case is detected by 
observing local predicates
    +   * on the dimension tables. In a star schema relationship, the join 
between the fact and the
    +   * dimension table is a FK-PK join. Heuristically, a selective dimension 
may reduce
    +   * the result of a join.
    +   *
    +   * 3) The star join is not a selective join (i.e. doesn't reduce the 
number of rows). In this
    +   * case, the algorithm conservatively falls back to the default join 
reordering.
    +   *
    +   * If an eligible star join was found in step 2 above, the algorithm 
reorders the tables based
    +   * on the following heuristics:
    +   * 1) Place the largest fact table on the driving arm to avoid large 
tables on the inner of a
    +   *    join and thus favor hash joins.
    +   * 2) Apply the most selective dimensions early in the plan to reduce 
data flow.
    +   *
    +   * Other assumptions made by the algorithm, mainly to prevent 
regressions in the absence of a
    +   * cost model, are the following:
    +   * 1) Only considers star joins with more than one dimensions, which is 
a typical
    +   *    star join scenario.
    +   * 2) If the top largest tables have comparable number of rows, fall 
back to the default
    +   *    join reordering. This will prevent changing the position of the 
large tables in the join.
    +   */
    +  def findStarJoinPlan(
    +      input: Seq[(LogicalPlan, InnerLike)],
    +      conditions: Seq[Expression]): Seq[(LogicalPlan, InnerLike)] = {
    +    assert(input.size >= 2)
    +
    +    val emptyStarJoinPlan = Seq.empty[(LogicalPlan, InnerLike)]
    +
    +    // Find if the input plans are eligible for star join detection.
    +    // An eligible plan is a base table access with valid statistics.
    +    val foundEligibleJoin = input.forall { plan =>
    +      plan._1 match {
    +        case BaseTableAccess(t, _) if t.stats(conf).rowCount.isDefined => 
true
    +        case _ => false
    +      }
    +    }
    +
    +    if (!foundEligibleJoin) {
    +      // Some plans don't have stats or are complex plans. Conservatively 
fall back
    +      // to the default join reordering by returning an empty star join.
    +      // This restriction can be lifted once statistics are propagated in 
the plan.
    +      emptyStarJoinPlan
    +
    +    } else {
    +      // Find the fact table using cardinality based heuristics i.e.
    +      // the table with the largest number of rows.
    +      val sortedFactTables = input.map { plan =>
    +        TableCardinality(plan, computeTableCardinality(plan._1, 
conditions))
    +      }.collect { case t @ TableCardinality(_, Some(_)) =>
    +        t
    +      }.sortBy(_.size)(implicitly[Ordering[Option[BigInt]]].reverse)
    +
    +      sortedFactTables match {
    +        case Nil =>
    +          emptyStarJoinPlan
    +        case table1 :: table2 :: _ if table2.size.get.toDouble >
    +            conf.starJoinFactTableRatio * table1.size.get.toDouble =>
    +          // The largest tables have comparable number of rows.
    +          emptyStarJoinPlan
    +        case TableCardinality(factPlan @ (factTable, _), _) :: _ =>
    +          // Find the fact table joins.
    +          val allFactJoins = input.filterNot(_._1 eq factTable).filter { 
plan =>
    +            val joinCond = findJoinConditions(factTable, plan._1, 
conditions)
    +            joinCond.nonEmpty
    +          }
    +
    +          // Find the corresponding join conditions.
    +          val allFactJoinCond = allFactJoins.flatMap { plan =>
    +            val joinCond = findJoinConditions(factTable, plan._1, 
conditions)
    +            joinCond
    +          }
    +
    +          // Verify if the join columns have valid statistics
    +          val areStatsAvailable = allFactJoins.forall { plan =>
    +            val dimTable = plan._1
    --- End diff --
    
    @gatorsmile Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to