[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17546 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r94456 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. --- End diff -- @cloud-fan Left-deep trees are executed in a pipelined fashion. Given the following join trees: ``` left-deep tree: join / \ join t3 / \ t1 t2 bushy-tree: join / \ t1 join /\ t2 t3 ``` The bushy-tree plan (right-deep in this case) requires the result of (t2 join t3) to be materialized before joining with t1. The left-deep tree doesnât have this requirement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r92785 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid --- End diff -- @cloud-fan DP join enumeration relies on ```ReorderJoin``` to pull up the Cartesian products. Instead, I think that Cartesian pull-up should be implemented as another filter on top of DP join enumeration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r92197 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging { case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) }.toMap) +// Build filters from the join graph to be used by the search algorithm. +val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex) --- End diff -- @cloud-fan I call it "filters" since the join graph information is used as filters on top of the DP join enumeration. It suggests the purpose for which the graph info was gathered. If this is confusing, I can rename. Let me know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r91433 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) -// TODO: Compute the set of star-joins and use them in the join enumeration -// algorithm to prune un-optimal plan choices. --- End diff -- @viirya Star-schema detection is called from both ```CostBasedJoinReorder``` and ```ReorderJoin```. In the latter case, it is called to reorder star-joins based on heuristics if cbo is disabled. When cost-based optimizer becomes the default optimizer, we donât need to reorder star-joins in ```ReorderJoin``` based on heuristics since the cost-based optimizer will choose the best plan based on cost. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r111098712 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. --- End diff -- I don't get it, doesn't left-deep tree materialize intermediate results? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r111098412 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid --- End diff -- We already have this logic in the dp join reorder algorithm. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r111098128 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging { case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) }.toMap) +// Build filters from the join graph to be used by the search algorithm. +val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex) --- End diff -- why call it filters? should we name it `planInfo`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r111081203 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) -// TODO: Compute the set of star-joins and use them in the join enumeration -// algorithm to prune un-optimal plan choices. --- End diff -- What the meaning of "remove the call from `ReorderJoin`"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r111063912 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) -// TODO: Compute the set of star-joins and use them in the join enumeration -// algorithm to prune un-optimal plan choices. --- End diff -- @cloud-fan Once CBO is enabled by default, I can remove the call from ```ReorderJoin```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r111058094 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) -// TODO: Compute the set of star-joins and use them in the join enumeration -// algorithm to prune un-optimal plan choices. --- End diff -- do we have a plan to completely merge these 2 rules? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110989528 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( --- End diff -- Removed star-filter dependency on the ```STARSCHEMA_DETECTION``` conf. When star-schema is called from CBO, it is under the control of ```JOIN_REORDER_DP_STAR_FILTER```. When called from ```ReorderJoin```, it is under the control of the ```STARSCHEMA_DETECTION``` conf. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110988847 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala --- @@ -76,7 +76,7 @@ case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper { val emptyStarJoinPlan = Seq.empty[LogicalPlan] -if (!conf.starSchemaDetection || input.size < 2) { +if (input.size < 2) { --- End diff -- Removed unnecessary call to conf.starSchemaDetection. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110987936 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with Logging { } /** - * Builds a new JoinPlan when both conditions hold: + * Builds a new JoinPlan if the following conditions hold: * - the sets of items contained in left and right sides do not overlap. * - there exists at least one join condition involving references from both sides. + * - if star-join filter is enabled, allow the following combinations: + * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join + * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan) + * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join + * * @param oneJoinPlan One side JoinPlan for building a new JoinPlan. * @param otherJoinPlan The other side JoinPlan for building a new join node. * @param conf SQLConf for statistics computation. * @param conditions The overall set of join conditions. * @param topOutput The output attributes of the final plan. + * @param filters Join graph info to be used as filters by the search algorithm. * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None. */ private def buildJoin( oneJoinPlan: JoinPlan, otherJoinPlan: JoinPlan, conf: SQLConf, conditions: Set[Expression], - topOutput: AttributeSet): Option[JoinPlan] = { + topOutput: AttributeSet, + filters: Option[JoinGraphInfo]): Option[JoinPlan] = { if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) { // Should not join two overlapping item sets. return None } +if (conf.joinReorderDPStarFilter && filters.isDefined) { --- End diff -- @wzhfy I moved the ```conf``` condition inside ```buildJoinGraphInfo```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110987408 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +349,110 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) --- End diff -- @wzhfy This is a very useful code simplification. Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110987294 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, --- End diff -- @wzhfy Removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110987218 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with Logging { } /** - * Builds a new JoinPlan when both conditions hold: + * Builds a new JoinPlan if the following conditions hold: * - the sets of items contained in left and right sides do not overlap. * - there exists at least one join condition involving references from both sides. + * - if star-join filter is enabled, allow the following combinations: + * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join + * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan) + * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join + * * @param oneJoinPlan One side JoinPlan for building a new JoinPlan. * @param otherJoinPlan The other side JoinPlan for building a new join node. * @param conf SQLConf for statistics computation. * @param conditions The overall set of join conditions. * @param topOutput The output attributes of the final plan. + * @param filters Join graph info to be used as filters by the search algorithm. * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None. */ private def buildJoin( oneJoinPlan: JoinPlan, otherJoinPlan: JoinPlan, conf: SQLConf, conditions: Set[Expression], - topOutput: AttributeSet): Option[JoinPlan] = { + topOutput: AttributeSet, + filters: Option[JoinGraphInfo]): Option[JoinPlan] = { if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) { // Should not join two overlapping item sets. return None } +if (conf.joinReorderDPStarFilter && filters.isDefined) { + // Apply star-join filter, which ensures that tables in a star schema relationship + // are planned together. The star-filter will eliminate joins among star and non-star + // tables until the star joins are built. The following combinations are allowed: + // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join + // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan) + // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join + val isValidJoinCombination = +JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds, --- End diff -- @wzhfy I agree with passing ```conf``` as a parameter to ```buildJoinGraphInfo```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110984951 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) -// TODO: Compute the set of star-joins and use them in the join enumeration -// algorithm to prune un-optimal plan choices. --- End diff -- @cloud-fan Thatâs correct. If CBO is enabled, it will do the final planning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110848185 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) -// TODO: Compute the set of star-joins and use them in the join enumeration -// algorithm to prune un-optimal plan choices. --- End diff -- ah, so if users enable the star join reorder and cbo join reorder together, stat join will still be overwritten by cbo join reorder rule? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110831710 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with Logging { } /** - * Builds a new JoinPlan when both conditions hold: + * Builds a new JoinPlan if the following conditions hold: * - the sets of items contained in left and right sides do not overlap. * - there exists at least one join condition involving references from both sides. + * - if star-join filter is enabled, allow the following combinations: + * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join + * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan) + * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join + * * @param oneJoinPlan One side JoinPlan for building a new JoinPlan. * @param otherJoinPlan The other side JoinPlan for building a new join node. * @param conf SQLConf for statistics computation. * @param conditions The overall set of join conditions. * @param topOutput The output attributes of the final plan. + * @param filters Join graph info to be used as filters by the search algorithm. * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None. */ private def buildJoin( oneJoinPlan: JoinPlan, otherJoinPlan: JoinPlan, conf: SQLConf, conditions: Set[Expression], - topOutput: AttributeSet): Option[JoinPlan] = { + topOutput: AttributeSet, + filters: Option[JoinGraphInfo]): Option[JoinPlan] = { if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) { // Should not join two overlapping item sets. return None } +if (conf.joinReorderDPStarFilter && filters.isDefined) { --- End diff -- Move condition `conf.joinReorderDPStarFilter` to `def buildJoinGraphInfo`? We only need to check it once, if it's turned off, just return an empty filter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110828815 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, --- End diff -- unnecessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110830756 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +349,110 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) --- End diff -- The above logic can be simplified: ``` val itemMap = itemIndex.toMap Some(JoinGraphInfo(starJoin.map(itemMap).toSet, nonStarJoin.map(itemMap).toSet)) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110827587 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with Logging { } /** - * Builds a new JoinPlan when both conditions hold: + * Builds a new JoinPlan if the following conditions hold: * - the sets of items contained in left and right sides do not overlap. * - there exists at least one join condition involving references from both sides. + * - if star-join filter is enabled, allow the following combinations: + * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join + * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan) + * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join + * * @param oneJoinPlan One side JoinPlan for building a new JoinPlan. * @param otherJoinPlan The other side JoinPlan for building a new join node. * @param conf SQLConf for statistics computation. * @param conditions The overall set of join conditions. * @param topOutput The output attributes of the final plan. + * @param filters Join graph info to be used as filters by the search algorithm. * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None. */ private def buildJoin( oneJoinPlan: JoinPlan, otherJoinPlan: JoinPlan, conf: SQLConf, conditions: Set[Expression], - topOutput: AttributeSet): Option[JoinPlan] = { + topOutput: AttributeSet, + filters: Option[JoinGraphInfo]): Option[JoinPlan] = { if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) { // Should not join two overlapping item sets. return None } +if (conf.joinReorderDPStarFilter && filters.isDefined) { + // Apply star-join filter, which ensures that tables in a star schema relationship + // are planned together. The star-filter will eliminate joins among star and non-star + // tables until the star joins are built. The following combinations are allowed: + // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join + // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan) + // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join + val isValidJoinCombination = +JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds, --- End diff -- Pass the `conf` as a parameter of `def buildJoinGraphInfo`, then `JoinReorderDPFilters` can be an object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110826959 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with Logging { } /** - * Builds a new JoinPlan when both conditions hold: + * Builds a new JoinPlan if the following conditions hold: * - the sets of items contained in left and right sides do not overlap. * - there exists at least one join condition involving references from both sides. + * - if star-join filter is enabled, allow the following combinations: + * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join + * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan) + * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join + * * @param oneJoinPlan One side JoinPlan for building a new JoinPlan. * @param otherJoinPlan The other side JoinPlan for building a new join node. * @param conf SQLConf for statistics computation. * @param conditions The overall set of join conditions. * @param topOutput The output attributes of the final plan. + * @param filters Join graph info to be used as filters by the search algorithm. * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None. */ private def buildJoin( oneJoinPlan: JoinPlan, otherJoinPlan: JoinPlan, conf: SQLConf, conditions: Set[Expression], - topOutput: AttributeSet): Option[JoinPlan] = { + topOutput: AttributeSet, + filters: Option[JoinGraphInfo]): Option[JoinPlan] = { if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) { // Should not join two overlapping item sets. return None } +if (conf.joinReorderDPStarFilter && filters.isDefined) { + // Apply star-join filter, which ensures that tables in a star schema relationship + // are planned together. The star-filter will eliminate joins among star and non-star + // tables until the star joins are built. The following combinations are allowed: + // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join + // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan) + // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join + val isValidJoinCombination = +JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds, --- End diff -- This will create a new `JoinReorderDPFilters` instance every time we try to build a join node. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110742657 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -218,28 +220,44 @@ object JoinReorderDP extends PredicateHelper with Logging { } /** - * Builds a new JoinPlan when both conditions hold: + * Builds a new JoinPlan if the following conditions hold: * - the sets of items contained in left and right sides do not overlap. * - there exists at least one join condition involving references from both sides. + * - if star-join filter is enabled, allow the following combinations: + * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join + * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan) + * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join + * * @param oneJoinPlan One side JoinPlan for building a new JoinPlan. * @param otherJoinPlan The other side JoinPlan for building a new join node. * @param conf SQLConf for statistics computation. * @param conditions The overall set of join conditions. * @param topOutput The output attributes of the final plan. + * @param filters Join graph info to be used as filters by the search algorithm. * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None. */ private def buildJoin( oneJoinPlan: JoinPlan, otherJoinPlan: JoinPlan, conf: SQLConf, conditions: Set[Expression], - topOutput: AttributeSet): Option[JoinPlan] = { + topOutput: AttributeSet, + filters: Option[JoinGraphInfo]): Option[JoinPlan] = { if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) { // Should not join two overlapping item sets. return None } +if (conf.joinReorderDPStarFilter && filters.isDefined) { + // Apply star-join filter, which ensures that tables in a star schema relationship + // are planned together. + val isValidJoinCombination = +JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds, + filters.get) + if (!isValidJoinCombination) return None --- End diff -- @cloud-fan The star filter will eliminate joins among star and non-star tables until the star-joins are build. The assumption is that star-join should be planned together. I will add more comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110742361 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} --- End diff -- @cloud-fan ```outer/inner``` i.e. ```oneJoinPlan/otherJoinPlan``` represent all the plan permutations generated by ```JoinReorderDP```. For example, at level 1, join enumeration will combine plans from level 0 e.g. ```oneJoinPlan = (f1)``` and ```otherJoinPlan = (d1)```. At level 2, it will generate plans from plan combinations at level 0 and level 1 e.g. ```oneJoinPlan = (d2)``` and ```otherJoinPlan = {f1, d1}```, and so on. I will clarify the comment with more details. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110741246 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join --- End diff -- @cloud-fan The ```outer/inner``` represents the join plan combinations generated by ```JoinReorderDP```. ```JoinReorderDP``` calls them ```oneJoinPlan/otherJoinPlan```. I will rename them to align to join DP. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110740755 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) -// TODO: Compute the set of star-joins and use them in the join enumeration -// algorithm to prune un-optimal plan choices. --- End diff -- @cloud-fan Star-schema detection is first called to compute the set of tables connected by star-schema relationship e.g. {F1, D1, D2} in our code example. This call does not do any join reordering among the tables. It simply computes the set of tables in a star-schema relationship. Then, DP join enumeration generates all possible plan combinations among the entire set of tables in a the join e.g. {F1, D1}, {F1, T1}, {T2, T3}, etc. Star-filter, if called, will eliminate plan combinations among the star and non-star tables until the star join combinations are built. For example, {F1, D1} combination will be retained since it involves tables in a star schema, but {F1, T1} will be eliminated since it mixes star and non-star tables. Star-filter simply decides what combinations to retain but it will not decide on the order of execution of those tables. The order of the joins within a star-join and for the overall plan is decided by the DP join enumeration. Star-filter only ensures that tables in a star-join are planned together. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110574182 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) -// TODO: Compute the set of star-joins and use them in the join enumeration -// algorithm to prune un-optimal plan choices. --- End diff -- what's the overall approach? Run star join reorder rule first, and make the DP join reorder rule respect star join and keep the join order generated by star join reorder rule? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110573027 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -218,28 +220,44 @@ object JoinReorderDP extends PredicateHelper with Logging { } /** - * Builds a new JoinPlan when both conditions hold: + * Builds a new JoinPlan if the following conditions hold: * - the sets of items contained in left and right sides do not overlap. * - there exists at least one join condition involving references from both sides. + * - if star-join filter is enabled, allow the following combinations: + * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join + * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan) + * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join + * * @param oneJoinPlan One side JoinPlan for building a new JoinPlan. * @param otherJoinPlan The other side JoinPlan for building a new join node. * @param conf SQLConf for statistics computation. * @param conditions The overall set of join conditions. * @param topOutput The output attributes of the final plan. + * @param filters Join graph info to be used as filters by the search algorithm. * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None. */ private def buildJoin( oneJoinPlan: JoinPlan, otherJoinPlan: JoinPlan, conf: SQLConf, conditions: Set[Expression], - topOutput: AttributeSet): Option[JoinPlan] = { + topOutput: AttributeSet, + filters: Option[JoinGraphInfo]): Option[JoinPlan] = { if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) { // Should not join two overlapping item sets. return None } +if (conf.joinReorderDPStarFilter && filters.isDefined) { + // Apply star-join filter, which ensures that tables in a star schema relationship + // are planned together. + val isValidJoinCombination = +JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds, + filters.get) + if (!isValidJoinCombination) return None --- End diff -- what's the logic here? If it's not a star join then it's not a valid join combination? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110570339 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} --- End diff -- can we also have an example about `outer` and `inner`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110570287 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join --- End diff -- what `outer` and `inner` refer to? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110569876 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( --- End diff -- please add parameter doc, especially for `planIndex`, what does it mean? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110522547 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -54,14 +54,12 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) -// TODO: Compute the set of star-joins and use them in the join enumeration -// algorithm to prune un-optimal plan choices. val result = // Do reordering if the number of items is appropriate and join conditions exist. // We also need to check if costs of all items can be evaluated. if (items.size > 2 && items.size <= conf.joinReorderDPThreshold && conditions.nonEmpty && items.forall(_.stats(conf).rowCount.isDefined)) { -JoinReorderDP.search(conf, items, conditions, output) +JoinReorderDP(conf).search(conf, items, conditions, output) --- End diff -- Reverted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110520685 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = +buildConf("spark.sql.cbo.joinReorder.dp.star.filter") + .doc("Applies star-join filter heuristics to cost based join enumeration.") + .booleanConf + .createWithDefault(false) --- End diff -- Yea I also think we keep the default false. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110495345 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = +buildConf("spark.sql.cbo.joinReorder.dp.star.filter") + .doc("Applies star-join filter heuristics to cost based join enumeration.") + .booleanConf + .createWithDefault(false) --- End diff -- @ron8hu Thank you. We will keep the default false. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110480494 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = +buildConf("spark.sql.cbo.joinReorder.dp.star.filter") + .doc("Applies star-join filter heuristics to cost based join enumeration.") + .booleanConf + .createWithDefault(false) --- End diff -- In Spark 2.2, we introduced a couple of new configuration parameters in optimizer area. In order to play on the safe side, we set the default value to false. I suggest that we can change the default value to true AFTER we are sure that the new optimizer feature does not cause any regression. I think the system regression/integration test suites help us make a decision. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110466604 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = +buildConf("spark.sql.cbo.joinReorder.dp.star.filter") + .doc("Applies star-join filter heuristics to cost based join enumeration.") + .booleanConf + .createWithDefault(false) --- End diff -- @gatorsmile I am also fine with changing the default. @wzhfy What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110438558 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = --- End diff -- @viirya Yes, we can enable the feature by default, but I would like to keep the filter under the config option. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110437532 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -134,7 +132,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr * For cost evaluation, since physical costs for operators are not available currently, we use * cardinalities and sizes to compute costs. */ -object JoinReorderDP extends PredicateHelper with Logging { +case class JoinReorderDP(conf: SQLConf) extends PredicateHelper with Logging { --- End diff -- @gatorsmile I misunderstood your previous comment. Yes, I will revert it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110396402 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = +buildConf("spark.sql.cbo.joinReorder.dp.star.filter") + .doc("Applies star-join filter heuristics to cost based join enumeration.") + .booleanConf + .createWithDefault(false) --- End diff -- I am fine to keep this conf. I am just thinking whether we should change the default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110396306 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -134,7 +132,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr * For cost evaluation, since physical costs for operators are not available currently, we use * cardinalities and sizes to compute costs. */ -object JoinReorderDP extends PredicateHelper with Logging { +case class JoinReorderDP(conf: SQLConf) extends PredicateHelper with Logging { --- End diff -- In the future, we can add the change if needed. Now, this change is not being used by this PR. We already pass `conf` in the function call. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110318802 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -134,7 +132,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr * For cost evaluation, since physical costs for operators are not available currently, we use * cardinalities and sizes to compute costs. */ -object JoinReorderDP extends PredicateHelper with Logging { +case class JoinReorderDP(conf: SQLConf) extends PredicateHelper with Logging { --- End diff -- @gatorsmile I would like to control the filters on top of the join enumeration. We might have other filters, e.g. left-deep trees only. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110318621 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = +buildConf("spark.sql.cbo.joinReorder.dp.star.filter") + .doc("Applies star-join filter heuristics to cost based join enumeration.") + .booleanConf + .createWithDefault(false) --- End diff -- @gatorsmile Regardless of the default value, I still want to control the filters with their own knobs. The filters are applied on top of the join enumeration. They need to have their own control. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110318101 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || --- End diff -- @viirya The cost-based optimizer will find the best plan for the star-join. The star filter is a heuristic within join enumeration to limit the join sequences evaluated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110316465 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = --- End diff -- So we can have this as true by default? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110314839 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D1 (dimension) +attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D2 (dimension) +attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D3 (dimension) +attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// T1 (regular table i.e. outside star) +attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20), + nullCount = 1, avgLen = 4, maxLen = 4),
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110314588 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D1 (dimension) +attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D2 (dimension) +attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D3 (dimension) +attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// T1 (regular table i.e. outside star) +attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20), + nullCount = 1, avgLen = 4, maxLen = 4),
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110314369 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = --- End diff -- @viirya Star join plans are expected to have an optimal execution based on their referential integrity constraints among the tables. It is a good heuristic. I expect that once CBO is enabled by default, star joins will also be enabled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110313675 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -54,14 +54,12 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) -// TODO: Compute the set of star-joins and use them in the join enumeration -// algorithm to prune un-optimal plan choices. val result = // Do reordering if the number of items is appropriate and join conditions exist. // We also need to check if costs of all items can be evaluated. if (items.size > 2 && items.size <= conf.joinReorderDPThreshold && conditions.nonEmpty && items.forall(_.stats(conf).rowCount.isDefined)) { -JoinReorderDP.search(conf, items, conditions, output) +JoinReorderDP(conf).search(conf, items, conditions, output) --- End diff -- Revert it back? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110313661 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -134,7 +132,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr * For cost evaluation, since physical costs for operators are not available currently, we use * cardinalities and sizes to compute costs. */ -object JoinReorderDP extends PredicateHelper with Logging { +case class JoinReorderDP(conf: SQLConf) extends PredicateHelper with Logging { --- End diff -- Revert it back? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110313369 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = +buildConf("spark.sql.cbo.joinReorder.dp.star.filter") + .doc("Applies star-join filter heuristics to cost based join enumeration.") + .booleanConf + .createWithDefault(false) --- End diff -- cc @wzhfy @ron8hu @sameeragarwal @cloud-fan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110313349 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = +buildConf("spark.sql.cbo.joinReorder.dp.star.filter") + .doc("Applies star-join filter heuristics to cost based join enumeration.") + .booleanConf + .createWithDefault(false) --- End diff -- The logics will be enabled if and only if both `conf.cboEnabled` and `conf.joinReorderEnabled` are true. Thus, it is safe to be `true` by default? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110309359 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D1 (dimension) +attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D2 (dimension) +attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D3 (dimension) +attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// T1 (regular table i.e. outside star) +attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20), + nullCount = 1, avgLen = 4, maxLen = 4), +
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110309073 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D1 (dimension) +attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D2 (dimension) +attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D3 (dimension) +attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// T1 (regular table i.e. outside star) +attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20), + nullCount = 1, avgLen = 4, maxLen = 4), +
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110308327 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = --- End diff -- Is there any cases we don't want to enable this if cbo is enabled? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110307898 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && --- End diff -- ok for me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110307786 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || --- End diff -- oh. right. forget that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110307666 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || --- End diff -- `ReorderJoin` is done heuristically. It can be useful when cbo is off. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110306486 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || --- End diff -- So do we still need `ReorderJoin`? Looks like we don't need it anymore if we don't care about the order created by it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110305903 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || --- End diff -- yes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110303409 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || --- End diff -- If so, with this added filter, `CostBasedJoinReorder` can also let the star join plans together, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110302895 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || --- End diff -- > Doesn't this cost-based join reorder rule breaks the order created by ReorderJoin? This is expected from cost based reordering. `ReorderJoin` only puts connected items together, the order among these items is not optimized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110300420 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || --- End diff -- `ReorderJoin` will reorder the star join plans. Doesn't this cost-based join reorder rule breaks the order created by `ReorderJoin`? Here we only ask this rule doesn't try to reorder part of star join plans and non-star join plans, but it still can reorder the order among star join plans. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110221774 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), --- End diff -- @wzhfy I will change the max values. To control the layout of the join plans, I intentionally kept certain stats constant (e.g. size on the non-fact tables) and only varied the rowcount and the number of distinct values. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110213152 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D1 (dimension) +attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D2 (dimension) +attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D3 (dimension) +attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// T1 --- End diff -- @wzhfy I will update the comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110212976 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D1 (dimension) +attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D2 (dimension) +attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D3 (dimension) +attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// T1 +attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), +attr("t1_c2") ->
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110211908 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || +// Join is a subset of non-star +join.subsetOf(nonStarJoins)) + } +} + +/** + * Helper class that keeps information about the join graph as sets of Int. --- End diff -- @wzhfy I will correct that. Thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110211689 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { --- End diff -- @wzhfy I will generalize the star detection algorithm in a follow up PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110211394 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || --- End diff -- @viirya The star filter ensures that star-joins will be planned together. It is the cost based optimizer that decide on the best execution plan within a star-join. Let me know if I answer your question. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110210819 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && --- End diff -- @wzhfy @viirya I intentionally added the condition here, in addition to the outside disjointness check, so the star algorithm is self contained. I would prefer to keep it here as well, unless you have strong objections. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110090953 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { --- End diff -- yap, you can see `StarSchemaDetection`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110090773 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { --- End diff -- Seems multi-star detection is not supported? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110090373 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), --- End diff -- Same for some other attributes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110090087 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging { case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) }.toMap) +// Build filters from the join graph to be used by the search algorithm. +val filters = JoinReorderDPFilters(conf).buildJoinGraphInfo(items, conditions, itemIndex) + // Build plans for next levels until the last level has only one plan. This plan contains // all items that can be joined, so there's no need to continue. val topOutputSet = AttributeSet(output) -while (foundPlans.size < items.length && foundPlans.last.size > 1) { +while (foundPlans.size < items.length) { --- End diff -- yea we should remove this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110089743 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D1 (dimension) +attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D2 (dimension) +attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D3 (dimension) +attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// T1 +attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), +attr("t1_c2") -> ColumnStat(distinctCount =
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110089115 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D1 (dimension) +attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D2 (dimension) +attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// D3 (dimension) +attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), +attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + +// T1 --- End diff -- add some comment to indicate it's a table not in a star schema? --- If your project is set up for it, you can reply to this email and have your reply appear on
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110089871 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110088813 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || +// Join is a subset of non-star +join.subsetOf(nonStarJoins)) + } +} + +/** + * Helper class that keeps information about the join graph as sets of Int. --- End diff -- sets of Int -> sets of item ids --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110089241 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala --- @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( +CASE_SENSITIVE -> true, +CBO_ENABLED -> true, +JOIN_REORDER_ENABLED -> true, +STARSCHEMA_DETECTION -> true, +JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Operator Optimizations", FixedPoint(100), +CombineFilters, +PushDownPredicate, +ReorderJoin(conf), +PushPredicateThroughJoin, +ColumnPruning, +CollapseProject) :: +Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( +// F1 (fact table) +attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50), --- End diff -- It's an integer attribute, if its value range is [1, 50], ndv can't be larger than 50. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110083521 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || +// Join is a subset of the star-join +join.subsetOf(starJoins) || +// Star-join is a subset of join +starJoins.subsetOf(join) || --- End diff -- Can't a star join within a built join be reordered? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110078548 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + *to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + *large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + items: Seq[LogicalPlan], + conditions: Set[Expression], + planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + +// Compute the tables in a star-schema relationship. +val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) +val nonStarJoin = items.filterNot(starJoin.contains(_)) + +if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val (starInt, nonStarInt) = planIndex.collect { +case (p, i) if starJoin.contains(p) => + (Some(i), None) +case (p, i) if nonStarJoin.contains(p) => + (None, Some(i)) +case _ => + (None, None) + }.unzip + Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet)) +} else { + // Nothing interesting to return. + None +} + } + + /** + * Applies star-join filter. + * + * Given the outer/inner and the star/non-star sets, + * the following plan combinations are allowed: + * 1) (outer U inner) is a subset of star-join + * 2) star-join is a subset of (outer U inner) + * 3) (outer U inner) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + */ + def starJoinFilter( + outer: Set[Int], + inner: Set[Int], + filters: JoinGraphInfo) : Boolean = { +val starJoins = filters.starJoins +val nonStarJoins = filters.nonStarJoins +val join = outer.union(inner) + +// Disjoint sets +outer.intersect(inner).isEmpty && --- End diff -- I think the check in the beginning of `buildJoin` can guarantee this already. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ioana-delaney commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110076651 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging { case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) }.toMap) +// Build filters from the join graph to be used by the search algorithm. +val filters = JoinReorderDPFilters(conf).buildJoinGraphInfo(items, conditions, itemIndex) + // Build plans for next levels until the last level has only one plan. This plan contains // all items that can be joined, so there's no need to continue. val topOutputSet = AttributeSet(output) -while (foundPlans.size < items.length && foundPlans.last.size > 1) { +while (foundPlans.size < items.length) { --- End diff -- @wzhfy Condition "foundPlans.last.size > 1" does not apply when filters are used with the join enumeration since not all the plan combinations are generated. Without filters, I think the condition will always be satisfied, so I removed it completely. Let me know if you have a counter example. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
GitHub user ioana-delaney opened a pull request: https://github.com/apache/spark/pull/17546 [SPARK-20233] [SQL] Apply star-join filter heuristics to dynamic programming join enumeration ## What changes were proposed in this pull request? Implements star-join filter to reduce the search space for dynamic programming join enumeration. Consider the following join graph: ``` T1 D1 - T2 - T3 \ / F1 | D2 star-join: {F1, D1, D2} non-star: {T1, T2, T3} ``` The following join combinations will be generated: ``` level 0: (F1), (D1), (D2), (T1), (T2), (T3) level 1: {F1, D1}, {F1, D2}, {T2, T3} level 2: {F1, D1, D2} level 3: {F1, D1, D2, T1}, {F1, D1, D2, T2} level 4: {F1, D1, D2, T1, T2}, {F1, D1, D2, T2, T3 } level 6: {F1, D1, D2, T1, T2, T3} ``` ## How was this patch tested? New test suite ```StarJOinCostBasedReorderSuite.scala```. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ioana-delaney/spark starSchemaCBOv3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17546.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17546 commit 5eb9b30d4d3ae4d6bc4d137af5ec09a876df805a Author: Ioana DelaneyDate: 2017-04-06T04:04:20Z [SPARK-20233] Implement star-join filter for join enumeration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org