Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/15363#discussion_r104731830
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
---
@@ -20,19 +20,342 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
+import org.apache.spark.sql.catalyst.planning.{BaseTableAccess,
ExtractFiltersAndInnerJoins}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.CatalystConf
+
+/**
+ * Encapsulates star-schema join detection.
+ */
+case class DetectStarSchemaJoin(conf: CatalystConf) extends
PredicateHelper {
+
+ /**
+ * Star schema consists of one or more fact tables referencing a number
of dimension
+ * tables. In general, star-schema joins are detected using the
following conditions:
+ * 1. Informational RI constraints (reliable detection)
+ * + Dimension contains a primary key that is being joined to the
fact table.
+ * + Fact table contains foreign keys referencing multiple dimension
tables.
+ * 2. Cardinality based heuristics
+ * + Usually, the table with the highest cardinality is the fact
table.
+ * + Table being joined with the most number of tables is the fact
table.
+ *
+ * To detect star joins, the algorithm uses a combination of the above
two conditions.
+ * The fact table is chosen based on the cardinality heuristics, and the
dimension
+ * tables are chosen based on the RI constraints. A star join will
consist of the largest
+ * fact table joined with the dimension tables on their primary keys. To
detect that a
+ * column is a primary key, the algorithm uses table and column
statistics.
+ *
+ * Since Catalyst only supports left-deep tree plans, the algorithm
currently returns only
+ * the star join with the largest fact table. Choosing the largest fact
table on the
+ * driving arm to avoid large inners is in general a good heuristic.
This restriction can
+ * be lifted with support for bushy tree plans.
+ *
+ * The highlights of the algorithm are the following:
+ *
+ * Given a set of joined tables/plans, the algorithm first verifies if
they are eligible
+ * for star join detection. An eligible plan is a base table access with
valid statistics.
+ * A base table access represents Project or Filter operators above a
LeafNode. Conservatively,
+ * the algorithm only considers base table access as part of a star join
since they provide
+ * reliable statistics.
+ *
+ * If some of the plans are not base table access, or statistics are not
available, the algorithm
+ * falls back to the positional join reordering, since in the absence of
statistics it cannot make
+ * good planning decisions. Otherwise, the algorithm finds the table
with the largest cardinality
+ * (number of rows), which is assumed to be a fact table.
+ *
+ * Next, it computes the set of dimension tables for the current fact
table. A dimension table
+ * is assumed to be in a RI relationship with a fact table. To infer
column uniqueness,
+ * the algorithm compares the number of distinct values with the total
number of rows in the
+ * table. If their relative difference is within certain limits (i.e.
ndvMaxError * 2, adjusted
+ * based on tpcds data), the column is assumed to be unique.
+ *
+ * Given a star join, i.e. fact and dimension tables, the algorithm
considers three cases:
+ *
+ * 1) The star join is an expanding join i.e. the fact table is joined
using inequality
+ * predicates or Cartesian product. In this case, the algorithm
conservatively falls back
+ * to the default join reordering since it cannot make good planning
decisions in the absence
+ * of the cost model.
+ *
+ * 2) The star join is a selective join. This case is detected by
observing local predicates
+ * on the dimension tables. In a star schema relationship, the join
between the fact and the
+ * dimension table is a FK-PK join. Heuristically, a selective dimension
may reduce
+ * the result of a join.
+ *
+ * 3) The star join is not a selective join (i.e. doesn't reduce the
number of rows). In this
+ * case, the algorithm conservatively falls back to the default join
reordering.
+ *
+ * If an eligible star join was found in step 2 above, the algorithm
reorders the tables based
+ * on the following heuristics:
+ * 1) Place the largest fact table on the driving arm to avoid large
tables on the inner of a
+ * join and thus favor hash joins.
+ * 2) Apply the most selective dimensions early in the plan to reduce
data flow.
+ *
+ * Other assumptions made by the algorithm, mainly to prevent
regressions in the absence of a
+ * cost model, are the following:
+ * 1) Only considers star joins with more than one dimensions, which is
a typical
+ * star join scenario.
+ * 2) If the top largest tables have comparable number of rows, fall
back to the default
+ * join reordering. This will prevent changing the position of the
large tables in the join.
+ */
+ def findStarJoinPlan(
+ input: Seq[(LogicalPlan, InnerLike)],
+ conditions: Seq[Expression]): Seq[(LogicalPlan, InnerLike)] = {
+ assert(input.size >= 2)
+
+ val emptyStarJoinPlan = Seq.empty[(LogicalPlan, InnerLike)]
+
+ // Find if the input plans are eligible for star join detection.
+ // An eligible plan is a base table access with valid statistics.
+ val foundEligibleJoin = input.forall { plan =>
+ plan._1 match {
+ case BaseTableAccess(t, _) if t.stats(conf).rowCount.isDefined =>
true
+ case _ => false
+ }
+ }
+
+ if (!foundEligibleJoin) {
+ // Some plans don't have stats or are complex plans. Conservatively
fall back
+ // to the default join reordering by returning an empty star join.
+ // This restriction can be lifted once statistics are propagated in
the plan.
+ emptyStarJoinPlan
+
+ } else {
+ // Find the fact table using cardinality based heuristics i.e.
+ // the table with the largest number of rows.
+ val sortedFactTables = input.map { plan =>
+ TableCardinality(plan, computeTableCardinality(plan._1,
conditions))
+ }.collect { case t @ TableCardinality(_, Some(_)) =>
+ t
+ }.sortBy(_.size)(implicitly[Ordering[Option[BigInt]]].reverse)
+
+ sortedFactTables match {
+ case Nil =>
+ emptyStarJoinPlan
+ case table1 :: table2 :: _ if table2.size.get.toDouble >
+ conf.starJoinFactTableRatio * table1.size.get.toDouble =>
--- End diff --
Nit: style issue.
```Scala
case table1 :: table2 :: _
if table2.size.get.toDouble > conf.starJoinFactTableRatio *
table1.size.get.toDouble =>
// The largest tables have comparable number of rows.
emptyStarJoinPlan
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]