Github user ioana-delaney commented on a diff in the pull request:
https://github.com/apache/spark/pull/15363#discussion_r104825494
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
---
@@ -20,19 +20,342 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
+import org.apache.spark.sql.catalyst.planning.{BaseTableAccess,
ExtractFiltersAndInnerJoins}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.CatalystConf
+
+/**
+ * Encapsulates star-schema join detection.
+ */
+case class DetectStarSchemaJoin(conf: CatalystConf) extends
PredicateHelper {
+
+ /**
+ * Star schema consists of one or more fact tables referencing a number
of dimension
+ * tables. In general, star-schema joins are detected using the
following conditions:
+ * 1. Informational RI constraints (reliable detection)
+ * + Dimension contains a primary key that is being joined to the
fact table.
+ * + Fact table contains foreign keys referencing multiple dimension
tables.
+ * 2. Cardinality based heuristics
+ * + Usually, the table with the highest cardinality is the fact
table.
+ * + Table being joined with the most number of tables is the fact
table.
+ *
+ * To detect star joins, the algorithm uses a combination of the above
two conditions.
+ * The fact table is chosen based on the cardinality heuristics, and the
dimension
+ * tables are chosen based on the RI constraints. A star join will
consist of the largest
+ * fact table joined with the dimension tables on their primary keys. To
detect that a
+ * column is a primary key, the algorithm uses table and column
statistics.
+ *
+ * Since Catalyst only supports left-deep tree plans, the algorithm
currently returns only
+ * the star join with the largest fact table. Choosing the largest fact
table on the
+ * driving arm to avoid large inners is in general a good heuristic.
This restriction can
+ * be lifted with support for bushy tree plans.
+ *
+ * The highlights of the algorithm are the following:
+ *
+ * Given a set of joined tables/plans, the algorithm first verifies if
they are eligible
+ * for star join detection. An eligible plan is a base table access with
valid statistics.
+ * A base table access represents Project or Filter operators above a
LeafNode. Conservatively,
+ * the algorithm only considers base table access as part of a star join
since they provide
+ * reliable statistics.
+ *
+ * If some of the plans are not base table access, or statistics are not
available, the algorithm
+ * falls back to the positional join reordering, since in the absence of
statistics it cannot make
+ * good planning decisions. Otherwise, the algorithm finds the table
with the largest cardinality
+ * (number of rows), which is assumed to be a fact table.
+ *
+ * Next, it computes the set of dimension tables for the current fact
table. A dimension table
+ * is assumed to be in a RI relationship with a fact table. To infer
column uniqueness,
+ * the algorithm compares the number of distinct values with the total
number of rows in the
+ * table. If their relative difference is within certain limits (i.e.
ndvMaxError * 2, adjusted
+ * based on tpcds data), the column is assumed to be unique.
+ *
+ * Given a star join, i.e. fact and dimension tables, the algorithm
considers three cases:
+ *
+ * 1) The star join is an expanding join i.e. the fact table is joined
using inequality
+ * predicates or Cartesian product. In this case, the algorithm
conservatively falls back
+ * to the default join reordering since it cannot make good planning
decisions in the absence
+ * of the cost model.
+ *
+ * 2) The star join is a selective join. This case is detected by
observing local predicates
+ * on the dimension tables. In a star schema relationship, the join
between the fact and the
+ * dimension table is a FK-PK join. Heuristically, a selective dimension
may reduce
+ * the result of a join.
+ *
+ * 3) The star join is not a selective join (i.e. doesn't reduce the
number of rows). In this
+ * case, the algorithm conservatively falls back to the default join
reordering.
+ *
+ * If an eligible star join was found in step 2 above, the algorithm
reorders the tables based
+ * on the following heuristics:
+ * 1) Place the largest fact table on the driving arm to avoid large
tables on the inner of a
+ * join and thus favor hash joins.
+ * 2) Apply the most selective dimensions early in the plan to reduce
data flow.
+ *
+ * Other assumptions made by the algorithm, mainly to prevent
regressions in the absence of a
+ * cost model, are the following:
+ * 1) Only considers star joins with more than one dimensions, which is
a typical
+ * star join scenario.
+ * 2) If the top largest tables have comparable number of rows, fall
back to the default
+ * join reordering. This will prevent changing the position of the
large tables in the join.
+ */
+ def findStarJoinPlan(
+ input: Seq[(LogicalPlan, InnerLike)],
+ conditions: Seq[Expression]): Seq[(LogicalPlan, InnerLike)] = {
+ assert(input.size >= 2)
+
+ val emptyStarJoinPlan = Seq.empty[(LogicalPlan, InnerLike)]
+
+ // Find if the input plans are eligible for star join detection.
+ // An eligible plan is a base table access with valid statistics.
+ val foundEligibleJoin = input.forall { plan =>
+ plan._1 match {
+ case BaseTableAccess(t, _) if t.stats(conf).rowCount.isDefined =>
true
+ case _ => false
+ }
+ }
+
+ if (!foundEligibleJoin) {
+ // Some plans don't have stats or are complex plans. Conservatively
fall back
+ // to the default join reordering by returning an empty star join.
+ // This restriction can be lifted once statistics are propagated in
the plan.
+ emptyStarJoinPlan
+
+ } else {
+ // Find the fact table using cardinality based heuristics i.e.
+ // the table with the largest number of rows.
+ val sortedFactTables = input.map { plan =>
+ TableCardinality(plan, computeTableCardinality(plan._1,
conditions))
+ }.collect { case t @ TableCardinality(_, Some(_)) =>
+ t
+ }.sortBy(_.size)(implicitly[Ordering[Option[BigInt]]].reverse)
+
+ sortedFactTables match {
+ case Nil =>
+ emptyStarJoinPlan
+ case table1 :: table2 :: _ if table2.size.get.toDouble >
+ conf.starJoinFactTableRatio * table1.size.get.toDouble =>
+ // The largest tables have comparable number of rows.
+ emptyStarJoinPlan
+ case TableCardinality(factPlan @ (factTable, _), _) :: _ =>
+ // Find the fact table joins.
+ val allFactJoins = input.filterNot(_._1 eq factTable).filter {
plan =>
+ val joinCond = findJoinConditions(factTable, plan._1,
conditions)
+ joinCond.nonEmpty
+ }
+
+ // Find the corresponding join conditions.
+ val allFactJoinCond = allFactJoins.flatMap { plan =>
+ val joinCond = findJoinConditions(factTable, plan._1,
conditions)
+ joinCond
+ }
+
+ // Verify if the join columns have valid statistics
+ val areStatsAvailable = allFactJoins.forall { plan =>
+ val dimTable = plan._1
+ allFactJoinCond.exists {
+ case BinaryComparison(lhs: AttributeReference, rhs:
AttributeReference) =>
+ val dimCol = if (dimTable.outputSet.contains(lhs)) lhs
else rhs
+ val factCol = if (factTable.outputSet.contains(lhs)) lhs
else rhs
+ hasStatistics(dimCol, dimTable) && hasStatistics(factCol,
factTable)
+ case _ => false
+ }
+ }
+
+ if (!areStatsAvailable) {
+ emptyStarJoinPlan
+ } else {
+ // Find the subset of dimension tables. A dimension table is
assumed to be in
+ // RI relationship with the fact table. Also, conservatively,
only consider
+ // equi-join between a fact and a dimension table.
+ val eligibleDimPlans = allFactJoins.filter { plan =>
+ val dimTable = plan._1
+ allFactJoinCond.exists {
+ case cond @ BinaryComparison(lhs: AttributeReference, rhs:
AttributeReference)
+ if cond.isInstanceOf[EqualTo] ||
cond.isInstanceOf[EqualNullSafe] =>
+ val dimCol = if (dimTable.outputSet.contains(lhs)) lhs
else rhs
+ isUnique(dimCol, dimTable)
+ case _ => false
+ }
+ }
+
+ if (eligibleDimPlans.isEmpty) {
+ // An eligible star join was not found because the join is
not
+ // an RI join, or the star join is an expanding join.
+ // Conservatively fall back to the default join order.
+ emptyStarJoinPlan
+ } else if (eligibleDimPlans.size < 2) {
+ // Conservatively assume that a fact table is joined with
more than one dimension.
+ emptyStarJoinPlan
+ } else if (isSelectiveStarJoin(eligibleDimPlans.map {_._1},
conditions)) {
+ // This is a selective star join. Reorder the dimensions in
based on their
+ // cardinality and return the star-join plan.
+ val sortedDims = eligibleDimPlans.map { plan =>
+ TableCardinality(plan, computeTableCardinality(plan._1,
conditions))
+ }.sortBy(_.size).map {
+ case TableCardinality(plan, _) => plan
+ }
+ factPlan +: sortedDims
+ } else {
+ // This is a non selective star join. Conservatively fall
back to the default
+ // join order.
+ emptyStarJoinPlan
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Determines if a column referenced by a base table access is a primary
key.
+ * A column is a PK if it is not nullable and has unique values.
+ * To determine if a column has unique values in the absence of
informational
+ * RI constraints, the number of distinct values is compared to the total
+ * number of rows in the table. If their relative difference
+ * is within the expected limits (i.e. 2 *
spark.sql.statistics.ndv.maxError based
+ * on TPCDS data results), the column is assumed to have unique values.
+ */
+ private def isUnique(
+ column: Attribute,
+ plan: LogicalPlan): Boolean = plan match {
+ case BaseTableAccess(t, _) =>
+ val leafCol = findLeafNodeCol(column, plan)
+ leafCol match {
+ case Some(col) if t.outputSet.contains(col) =>
+ val stats = t.stats(conf)
+ stats.rowCount match {
+ case Some(rowCount) if rowCount >= 0 =>
+ if (stats.attributeStats.nonEmpty &&
stats.attributeStats.contains(col)) {
+ val colStats = stats.attributeStats.get(col)
+ if (colStats.get.nullCount > 0) {
+ false
+ } else {
+ val distinctCount = colStats.get.distinctCount
+ val relDiff = math.abs((distinctCount.toDouble /
rowCount.toDouble) - 1.0d)
+ // ndvMaxErr adjusted based on TPCDS 1TB data results
+ if (relDiff <= conf.ndvMaxError * 2) true else false
+ }
+ } else false
+ case None => false
+ }
+ case None => false
+ }
+ case _ => false
+ }
+
+ /**
+ * Given a column over a base table access, it returns
+ * the leaf node column from which the input column is derived.
+ */
+ @tailrec
+ private def findLeafNodeCol(
+ column: Attribute,
+ plan: LogicalPlan): Option[Attribute] = plan match {
+ case pl @ BaseTableAccess(_, _) =>
+ pl match {
+ case t: LeafNode if t.outputSet.contains(column) =>
+ Option(column)
+ case p: Project if p.outputSet.exists(_.semanticEquals(column)) =>
+ val col = p.outputSet.find(_.semanticEquals(column)).get
+ findLeafNodeCol(col, p.child)
+ case f: Filter =>
+ findLeafNodeCol(column, f.child)
+ case _ => None
+ }
+ case _ => None
+ }
+
+ /**
+ * Checks if a column has statistics.
+ * The column is assumed to be over a base table access.
+ */
+ private def hasStatistics(
+ column: Attribute,
+ plan: LogicalPlan): Boolean = plan match {
+ case BaseTableAccess(t, _) =>
+ val leafCol = findLeafNodeCol(column, plan)
+ leafCol match {
+ case Some(col) if t.outputSet.contains(col) =>
+ val stats = t.stats(conf)
+ stats.attributeStats.nonEmpty &&
stats.attributeStats.contains(col)
+ case None => false
+ }
+ case _ => false
+ }
+
+ /**
+ * Returns the join predicates between two input plans. It only
+ * considers basic comparison operators.
+ */
+ @inline
+ private def findJoinConditions(
+ plan1: LogicalPlan,
+ plan2: LogicalPlan,
+ conditions: Seq[Expression]): Seq[Expression] = {
+ val refs = plan1.outputSet ++ plan2.outputSet
+ conditions.filter {
+ case BinaryComparison(_, _) => true
+ case _ => false
+ }.filterNot(canEvaluate(_, plan1))
+ .filterNot(canEvaluate(_, plan2))
+ .filter(_.references.subsetOf(refs))
+ }
+
+ /**
+ * Checks if a star join is a selective join. A star join is assumed
+ * to be selective if there are local predicates on the dimension
+ * tables.
+ */
+ private def isSelectiveStarJoin(
+ dimTables: Seq[LogicalPlan],
+ conditions: Seq[Expression]): Boolean = dimTables.exists {
+ case plan @ BaseTableAccess(_, p) =>
+ // Checks if any condition applies to the dimension tables.
+ // Exclude the IsNotNull predicates until predicate selectivity is
available.
+ // In most cases, this predicate is artificially introduced by the
Optimizer
+ // to enforce nullability constraints.
+ val localPredicates = conditions.filterNot(_.isInstanceOf[IsNotNull])
+ .exists(canEvaluate(_, plan))
+
+ // Checks if there are any predicates pushed down to the base table
access.
+ val pushedDownPredicates = p.nonEmpty &&
!p.forall(_.isInstanceOf[IsNotNull])
+
+ localPredicates || pushedDownPredicates
+ case _ => false
+ }
+
+ /**
+ * Helper case class to hold (plan, rowCount) pairs.
+ */
+ private case class TableCardinality(plan: (LogicalPlan, InnerLike),
size: Option[BigInt])
+
+ /**
+ * Computes table cardinality after applying the predicates.
+ * Currently, the function returns table cardinality.
+ * When predicate selectivity is implemented in Catalyst,
--- End diff --
@gatorsmile Yes, thank you. I forgot about the recent cbo cardinality
changes. I've incorporated them.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]