[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17546


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-12 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r94456
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
--- End diff --

@cloud-fan Left-deep trees are executed in a pipelined fashion. Given the 
following join trees:
```
left-deep tree:

 join
/ \ 
  join t3 
 / \
t1 t2

bushy-tree:

  join
 / \
t1  join
/\
  t2 t3
```
The bushy-tree plan (right-deep in this case) requires the result of (t2 
join t3) to be materialized before joining with t1. The left-deep tree 
doesn’t have this requirement.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-12 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r92785
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
--- End diff --

@cloud-fan DP join enumeration relies on ```ReorderJoin``` to pull up the 
Cartesian products. Instead, I think that Cartesian pull-up should be 
implemented as another filter on top of DP join enumeration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-12 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r92197
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 
0))
 }.toMap)
 
+// Build filters from the join graph to be used by the search 
algorithm.
+val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, 
conditions, itemIndex)
--- End diff --

@cloud-fan I call it "filters" since the join graph information is used as 
filters on top of the DP join enumeration. It suggests the purpose for which 
the graph info was gathered.

If this is confusing, I can rename. Let me know.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-12 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r91433
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): 
LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
-// TODO: Compute the set of star-joins and use them in the join 
enumeration
-// algorithm to prune un-optimal plan choices.
--- End diff --

@viirya Star-schema detection is called from both 
```CostBasedJoinReorder``` and ```ReorderJoin```. In the latter case, it is 
called to reorder star-joins based on heuristics if cbo is disabled.

When cost-based optimizer becomes the default optimizer, we don’t need to 
reorder star-joins in ```ReorderJoin``` based on heuristics since the 
cost-based optimizer will choose the best plan based on cost. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r111098712
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
--- End diff --

I don't get it, doesn't left-deep tree materialize intermediate results?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r111098412
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
--- End diff --

We already have this logic in the dp join reorder algorithm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r111098128
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 
0))
 }.toMap)
 
+// Build filters from the join graph to be used by the search 
algorithm.
+val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, 
conditions, itemIndex)
--- End diff --

why call it filters? should we name it `planInfo`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r111081203
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): 
LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
-// TODO: Compute the set of star-joins and use them in the join 
enumeration
-// algorithm to prune un-optimal plan choices.
--- End diff --

What the meaning of "remove the call from `ReorderJoin`"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r111063912
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): 
LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
-// TODO: Compute the set of star-joins and use them in the join 
enumeration
-// algorithm to prune un-optimal plan choices.
--- End diff --

@cloud-fan Once CBO is enabled by default, I can remove the call from 
```ReorderJoin```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r111058094
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): 
LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
-// TODO: Compute the set of star-joins and use them in the join 
enumeration
-// algorithm to prune un-optimal plan choices.
--- End diff --

do we have a plan to completely merge these 2 rules?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110989528
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
--- End diff --

Removed star-filter dependency on the ```STARSCHEMA_DETECTION``` conf. When 
star-schema is called from CBO, it is under the control of 
```JOIN_REORDER_DP_STAR_FILTER```. When called from ```ReorderJoin```, it is 
under the control of the ```STARSCHEMA_DETECTION``` conf.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110988847
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
 ---
@@ -76,7 +76,7 @@ case class StarSchemaDetection(conf: SQLConf) extends 
PredicateHelper {
 
 val emptyStarJoinPlan = Seq.empty[LogicalPlan]
 
-if (!conf.starSchemaDetection || input.size < 2) {
+if (input.size < 2) {
--- End diff --

Removed unnecessary call to conf.starSchemaDetection. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110987936
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   }
 
   /**
-   * Builds a new JoinPlan when both conditions hold:
+   * Builds a new JoinPlan if the following conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from 
both sides.
+   * - if star-join filter is enabled, allow the following combinations:
+   * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+   * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+   * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+   *
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join 
node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
+   * @param filters Join graph info to be used as filters by the search 
algorithm.
* @return Builds and returns a new JoinPlan if both conditions hold. 
Otherwise, returns None.
*/
   private def buildJoin(
   oneJoinPlan: JoinPlan,
   otherJoinPlan: JoinPlan,
   conf: SQLConf,
   conditions: Set[Expression],
-  topOutput: AttributeSet): Option[JoinPlan] = {
+  topOutput: AttributeSet,
+  filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
 
 if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
   // Should not join two overlapping item sets.
   return None
 }
 
+if (conf.joinReorderDPStarFilter && filters.isDefined) {
--- End diff --

@wzhfy I moved the ```conf``` condition inside ```buildJoinGraphInfo```.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110987408
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +349,110 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
--- End diff --

@wzhfy This is a very useful code simplification. Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110987294
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
--- End diff --

@wzhfy Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110987218
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   }
 
   /**
-   * Builds a new JoinPlan when both conditions hold:
+   * Builds a new JoinPlan if the following conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from 
both sides.
+   * - if star-join filter is enabled, allow the following combinations:
+   * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+   * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+   * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+   *
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join 
node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
+   * @param filters Join graph info to be used as filters by the search 
algorithm.
* @return Builds and returns a new JoinPlan if both conditions hold. 
Otherwise, returns None.
*/
   private def buildJoin(
   oneJoinPlan: JoinPlan,
   otherJoinPlan: JoinPlan,
   conf: SQLConf,
   conditions: Set[Expression],
-  topOutput: AttributeSet): Option[JoinPlan] = {
+  topOutput: AttributeSet,
+  filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
 
 if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
   // Should not join two overlapping item sets.
   return None
 }
 
+if (conf.joinReorderDPStarFilter && filters.isDefined) {
+  // Apply star-join filter, which ensures that tables in a star 
schema relationship
+  // are planned together. The star-filter will eliminate joins among 
star and non-star
+  // tables until the star joins are built. The following combinations 
are allowed:
+  // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
+  // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
+  // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+  val isValidJoinCombination =
+JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, 
otherJoinPlan.itemIds,
--- End diff --

@wzhfy I agree with passing ```conf``` as a parameter to 
```buildJoinGraphInfo```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110984951
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): 
LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
-// TODO: Compute the set of star-joins and use them in the join 
enumeration
-// algorithm to prune un-optimal plan choices.
--- End diff --

@cloud-fan That’s correct. If CBO is enabled, it will do the final 
planning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110848185
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): 
LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
-// TODO: Compute the set of star-joins and use them in the join 
enumeration
-// algorithm to prune un-optimal plan choices.
--- End diff --

ah, so if users enable the star join reorder and cbo join reorder together, 
stat join will still be overwritten by cbo join reorder rule?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110831710
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   }
 
   /**
-   * Builds a new JoinPlan when both conditions hold:
+   * Builds a new JoinPlan if the following conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from 
both sides.
+   * - if star-join filter is enabled, allow the following combinations:
+   * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+   * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+   * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+   *
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join 
node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
+   * @param filters Join graph info to be used as filters by the search 
algorithm.
* @return Builds and returns a new JoinPlan if both conditions hold. 
Otherwise, returns None.
*/
   private def buildJoin(
   oneJoinPlan: JoinPlan,
   otherJoinPlan: JoinPlan,
   conf: SQLConf,
   conditions: Set[Expression],
-  topOutput: AttributeSet): Option[JoinPlan] = {
+  topOutput: AttributeSet,
+  filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
 
 if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
   // Should not join two overlapping item sets.
   return None
 }
 
+if (conf.joinReorderDPStarFilter && filters.isDefined) {
--- End diff --

Move condition `conf.joinReorderDPStarFilter` to `def buildJoinGraphInfo`? 
We only need to check it once, if it's turned off, just return an empty filter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110828815
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
--- End diff --

unnecessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110830756
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +349,110 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
--- End diff --

The above logic can be simplified:
```
val itemMap = itemIndex.toMap
Some(JoinGraphInfo(starJoin.map(itemMap).toSet, 
nonStarJoin.map(itemMap).toSet))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110827587
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   }
 
   /**
-   * Builds a new JoinPlan when both conditions hold:
+   * Builds a new JoinPlan if the following conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from 
both sides.
+   * - if star-join filter is enabled, allow the following combinations:
+   * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+   * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+   * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+   *
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join 
node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
+   * @param filters Join graph info to be used as filters by the search 
algorithm.
* @return Builds and returns a new JoinPlan if both conditions hold. 
Otherwise, returns None.
*/
   private def buildJoin(
   oneJoinPlan: JoinPlan,
   otherJoinPlan: JoinPlan,
   conf: SQLConf,
   conditions: Set[Expression],
-  topOutput: AttributeSet): Option[JoinPlan] = {
+  topOutput: AttributeSet,
+  filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
 
 if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
   // Should not join two overlapping item sets.
   return None
 }
 
+if (conf.joinReorderDPStarFilter && filters.isDefined) {
+  // Apply star-join filter, which ensures that tables in a star 
schema relationship
+  // are planned together. The star-filter will eliminate joins among 
star and non-star
+  // tables until the star joins are built. The following combinations 
are allowed:
+  // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
+  // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
+  // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+  val isValidJoinCombination =
+JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, 
otherJoinPlan.itemIds,
--- End diff --

Pass the `conf` as a parameter of `def buildJoinGraphInfo`, then 
`JoinReorderDPFilters` can be an object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-11 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110826959
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   }
 
   /**
-   * Builds a new JoinPlan when both conditions hold:
+   * Builds a new JoinPlan if the following conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from 
both sides.
+   * - if star-join filter is enabled, allow the following combinations:
+   * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+   * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+   * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+   *
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join 
node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
+   * @param filters Join graph info to be used as filters by the search 
algorithm.
* @return Builds and returns a new JoinPlan if both conditions hold. 
Otherwise, returns None.
*/
   private def buildJoin(
   oneJoinPlan: JoinPlan,
   otherJoinPlan: JoinPlan,
   conf: SQLConf,
   conditions: Set[Expression],
-  topOutput: AttributeSet): Option[JoinPlan] = {
+  topOutput: AttributeSet,
+  filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
 
 if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
   // Should not join two overlapping item sets.
   return None
 }
 
+if (conf.joinReorderDPStarFilter && filters.isDefined) {
+  // Apply star-join filter, which ensures that tables in a star 
schema relationship
+  // are planned together. The star-filter will eliminate joins among 
star and non-star
+  // tables until the star joins are built. The following combinations 
are allowed:
+  // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
+  // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
+  // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+  val isValidJoinCombination =
+JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, 
otherJoinPlan.itemIds,
--- End diff --

This will create a new `JoinReorderDPFilters` instance every time we try to 
build a join node.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-10 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110742657
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -218,28 +220,44 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   }
 
   /**
-   * Builds a new JoinPlan when both conditions hold:
+   * Builds a new JoinPlan if the following conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from 
both sides.
+   * - if star-join filter is enabled, allow the following combinations:
+   * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+   * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+   * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+   *
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join 
node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
+   * @param filters Join graph info to be used as filters by the search 
algorithm.
* @return Builds and returns a new JoinPlan if both conditions hold. 
Otherwise, returns None.
*/
   private def buildJoin(
   oneJoinPlan: JoinPlan,
   otherJoinPlan: JoinPlan,
   conf: SQLConf,
   conditions: Set[Expression],
-  topOutput: AttributeSet): Option[JoinPlan] = {
+  topOutput: AttributeSet,
+  filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
 
 if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
   // Should not join two overlapping item sets.
   return None
 }
 
+if (conf.joinReorderDPStarFilter && filters.isDefined) {
+  // Apply star-join filter, which ensures that tables in a star 
schema relationship
+  // are planned together.
+  val isValidJoinCombination =
+JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, 
otherJoinPlan.itemIds,
+  filters.get)
+  if (!isValidJoinCombination) return None
--- End diff --

@cloud-fan The star filter will eliminate joins among star and non-star 
tables until the star-joins are build. The assumption is that star-join should 
be planned together. I will add more comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-10 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110742361
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
--- End diff --

@cloud-fan ```outer/inner``` i.e. ```oneJoinPlan/otherJoinPlan``` represent 
all the plan permutations generated by ```JoinReorderDP```. For example, at 
level 1, join enumeration will combine plans from level 0 e.g. ```oneJoinPlan = 
(f1)``` and ```otherJoinPlan = (d1)```. At level 2, it will generate plans from 
plan combinations at level 0 and level 1 e.g. ```oneJoinPlan = (d2)``` and 
```otherJoinPlan = {f1, d1}```, and so on. I will clarify the comment with more 
details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-10 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110741246
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
--- End diff --

@cloud-fan The ```outer/inner``` represents the join plan combinations 
generated by ```JoinReorderDP```. ```JoinReorderDP``` calls them 
```oneJoinPlan/otherJoinPlan```. I will rename them to align to join DP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-10 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110740755
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): 
LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
-// TODO: Compute the set of star-joins and use them in the join 
enumeration
-// algorithm to prune un-optimal plan choices.
--- End diff --

@cloud-fan Star-schema detection is first called to compute the set of 
tables connected by star-schema relationship e.g. {F1, D1, D2} in our code 
example. This call does not do any join reordering among the tables. It simply 
computes the set of tables in a star-schema relationship. Then, DP join 
enumeration generates all possible plan combinations among the entire set of 
tables in a the join e.g. {F1, D1}, {F1, T1}, {T2, T3}, etc. Star-filter, if 
called, will eliminate plan combinations among the star and non-star tables 
until the star join combinations are built. For example, {F1, D1} combination 
will be retained since it involves tables in a star schema, but {F1, T1} will 
be eliminated since it mixes star and non-star tables. Star-filter simply 
decides what combinations to retain but it will not decide on the order of 
execution of those tables. The order of the joins within a star-join and for 
the overall plan is decided by the DP join enumeration. Star-filter only 
ensures that
  tables in a star-join are planned together. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110574182
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): 
LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
-// TODO: Compute the set of star-joins and use them in the join 
enumeration
-// algorithm to prune un-optimal plan choices.
--- End diff --

what's the overall approach? Run star join reorder rule first, and make the 
DP join reorder rule respect star join and keep the join order generated by 
star join reorder rule?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110573027
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -218,28 +220,44 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   }
 
   /**
-   * Builds a new JoinPlan when both conditions hold:
+   * Builds a new JoinPlan if the following conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from 
both sides.
+   * - if star-join filter is enabled, allow the following combinations:
+   * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+   * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+   * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+   *
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join 
node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
+   * @param filters Join graph info to be used as filters by the search 
algorithm.
* @return Builds and returns a new JoinPlan if both conditions hold. 
Otherwise, returns None.
*/
   private def buildJoin(
   oneJoinPlan: JoinPlan,
   otherJoinPlan: JoinPlan,
   conf: SQLConf,
   conditions: Set[Expression],
-  topOutput: AttributeSet): Option[JoinPlan] = {
+  topOutput: AttributeSet,
+  filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
 
 if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
   // Should not join two overlapping item sets.
   return None
 }
 
+if (conf.joinReorderDPStarFilter && filters.isDefined) {
+  // Apply star-join filter, which ensures that tables in a star 
schema relationship
+  // are planned together.
+  val isValidJoinCombination =
+JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, 
otherJoinPlan.itemIds,
+  filters.get)
+  if (!isValidJoinCombination) return None
--- End diff --

what's the logic here? If it's not a star join then it's not a valid join 
combination?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110570339
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
--- End diff --

can we also have an example about `outer` and `inner`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110570287
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
--- End diff --

what `outer` and `inner` refer to?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110569876
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
--- End diff --

please add parameter doc, especially for `planIndex`, what does it mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-08 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110522547
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -54,14 +54,12 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): 
LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
-// TODO: Compute the set of star-joins and use them in the join 
enumeration
-// algorithm to prune un-optimal plan choices.
 val result =
   // Do reordering if the number of items is appropriate and join 
conditions exist.
   // We also need to check if costs of all items can be evaluated.
   if (items.size > 2 && items.size <= conf.joinReorderDPThreshold && 
conditions.nonEmpty &&
   items.forall(_.stats(conf).rowCount.isDefined)) {
-JoinReorderDP.search(conf, items, conditions, output)
+JoinReorderDP(conf).search(conf, items, conditions, output)
--- End diff --

Reverted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-08 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110520685
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
+buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+  .doc("Applies star-join filter heuristics to cost based join 
enumeration.")
+  .booleanConf
+  .createWithDefault(false)
--- End diff --

Yea I also think we keep the default false.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-07 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110495345
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
+buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+  .doc("Applies star-join filter heuristics to cost based join 
enumeration.")
+  .booleanConf
+  .createWithDefault(false)
--- End diff --

@ron8hu Thank you. We will keep the default false.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-07 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110480494
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
+buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+  .doc("Applies star-join filter heuristics to cost based join 
enumeration.")
+  .booleanConf
+  .createWithDefault(false)
--- End diff --

In Spark 2.2, we introduced a couple of new configuration parameters in 
optimizer area.  In order to play on the safe side, we set the default value to 
false.  I suggest that we can change the default value to true AFTER we are 
sure that the new optimizer feature does not cause any regression.  I think the 
system regression/integration test suites help us make a decision. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-07 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110466604
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
+buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+  .doc("Applies star-join filter heuristics to cost based join 
enumeration.")
+  .booleanConf
+  .createWithDefault(false)
--- End diff --

@gatorsmile  I am also fine with changing the default.
@wzhfy What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-07 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110438558
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
--- End diff --

@viirya Yes, we can enable the feature by default, but I would like to keep 
the filter under the config option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-07 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110437532
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -134,7 +132,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
  * For cost evaluation, since physical costs for operators are not 
available currently, we use
  * cardinalities and sizes to compute costs.
  */
-object JoinReorderDP extends PredicateHelper with Logging {
+case class JoinReorderDP(conf: SQLConf) extends PredicateHelper with 
Logging {
--- End diff --

@gatorsmile I misunderstood your previous comment. Yes, I will revert it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110396402
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
+buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+  .doc("Applies star-join filter heuristics to cost based join 
enumeration.")
+  .booleanConf
+  .createWithDefault(false)
--- End diff --

I am fine to keep this conf. I am just thinking whether we should change 
the default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110396306
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -134,7 +132,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
  * For cost evaluation, since physical costs for operators are not 
available currently, we use
  * cardinalities and sizes to compute costs.
  */
-object JoinReorderDP extends PredicateHelper with Logging {
+case class JoinReorderDP(conf: SQLConf) extends PredicateHelper with 
Logging {
--- End diff --

In the future, we can add the change if needed. Now, this change is not 
being used by this PR. We already pass `conf` in the function call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110318802
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -134,7 +132,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
  * For cost evaluation, since physical costs for operators are not 
available currently, we use
  * cardinalities and sizes to compute costs.
  */
-object JoinReorderDP extends PredicateHelper with Logging {
+case class JoinReorderDP(conf: SQLConf) extends PredicateHelper with 
Logging {
--- End diff --

@gatorsmile I would like to control the filters on top of the join 
enumeration. We might have other filters, e.g. left-deep trees only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110318621
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
+buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+  .doc("Applies star-join filter heuristics to cost based join 
enumeration.")
+  .booleanConf
+  .createWithDefault(false)
--- End diff --

@gatorsmile Regardless of the default value, I still want to control the 
filters with their own knobs. The filters are applied on top of the join 
enumeration. They need to have their own control.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110318101
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
--- End diff --

@viirya The cost-based optimizer will find the best plan for the star-join. 
The star filter is a heuristic within join enumeration to limit the join 
sequences evaluated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110316465
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
--- End diff --

So we can have this as true by default?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110314839
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D1 (dimension)
+attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D2 (dimension)
+attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D3 (dimension)
+attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// T1 (regular table i.e. outside star)
+attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(20),
+  nullCount = 1, avgLen = 4, maxLen = 4),
  

[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110314588
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D1 (dimension)
+attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D2 (dimension)
+attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D3 (dimension)
+attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// T1 (regular table i.e. outside star)
+attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(20),
+  nullCount = 1, avgLen = 4, maxLen = 4),
  

[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110314369
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
--- End diff --

@viirya Star join plans are expected to have an optimal execution based on 
their referential integrity constraints among the tables. It is a good 
heuristic. I expect that once CBO is enabled by default, star joins will also 
be enabled.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110313675
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -54,14 +54,12 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): 
LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
-// TODO: Compute the set of star-joins and use them in the join 
enumeration
-// algorithm to prune un-optimal plan choices.
 val result =
   // Do reordering if the number of items is appropriate and join 
conditions exist.
   // We also need to check if costs of all items can be evaluated.
   if (items.size > 2 && items.size <= conf.joinReorderDPThreshold && 
conditions.nonEmpty &&
   items.forall(_.stats(conf).rowCount.isDefined)) {
-JoinReorderDP.search(conf, items, conditions, output)
+JoinReorderDP(conf).search(conf, items, conditions, output)
--- End diff --

Revert it back?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110313661
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -134,7 +132,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends 
Rule[LogicalPlan] with Pr
  * For cost evaluation, since physical costs for operators are not 
available currently, we use
  * cardinalities and sizes to compute costs.
  */
-object JoinReorderDP extends PredicateHelper with Logging {
+case class JoinReorderDP(conf: SQLConf) extends PredicateHelper with 
Logging {
--- End diff --

Revert it back?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110313369
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
+buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+  .doc("Applies star-join filter heuristics to cost based join 
enumeration.")
+  .booleanConf
+  .createWithDefault(false)
--- End diff --

cc @wzhfy @ron8hu @sameeragarwal @cloud-fan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110313349
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
+buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+  .doc("Applies star-join filter heuristics to cost based join 
enumeration.")
+  .booleanConf
+  .createWithDefault(false)
--- End diff --

The logics will be enabled if and only if both `conf.cboEnabled` and 
`conf.joinReorderEnabled` are true. Thus, it is safe to be `true` by default?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110309359
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D1 (dimension)
+attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D2 (dimension)
+attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D3 (dimension)
+attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// T1 (regular table i.e. outside star)
+attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(20),
+  nullCount = 1, avgLen = 4, maxLen = 4),
+

[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110309073
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D1 (dimension)
+attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D2 (dimension)
+attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D3 (dimension)
+attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// T1 (regular table i.e. outside star)
+attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(20),
+  nullCount = 1, avgLen = 4, maxLen = 4),
+

[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110308327
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
--- End diff --

Is there any cases we don't want to enable this if cbo is enabled?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110307898
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
--- End diff --

ok for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110307786
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
--- End diff --

oh. right. forget that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110307666
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
--- End diff --

`ReorderJoin` is done heuristically. It can be useful when cbo is off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110306486
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
--- End diff --

So do we still need `ReorderJoin`? Looks like we don't need it anymore if 
we don't care about the order created by it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110305903
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
--- End diff --

yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110303409
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
--- End diff --

If so, with this added filter, `CostBasedJoinReorder` can also let the star 
join plans together, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110302895
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
--- End diff --

> Doesn't this cost-based join reorder rule breaks the order created by 
ReorderJoin?

This is expected from cost based reordering. `ReorderJoin` only puts 
connected items together, the order among these items is not optimized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110300420
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
--- End diff --

`ReorderJoin` will reorder the star join plans. Doesn't this cost-based 
join reorder rule breaks the order created by `ReorderJoin`? Here we only ask 
this rule doesn't try to reorder part of star join plans and non-star join 
plans, but it still can reorder the order among star join plans.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110221774
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
--- End diff --

@wzhfy I will change the max values. 

To control the layout of the join plans, I intentionally kept certain stats 
constant (e.g. size on the non-fact tables) and only varied the rowcount and 
the number of distinct values. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110213152
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D1 (dimension)
+attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D2 (dimension)
+attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D3 (dimension)
+attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// T1
--- End diff --

@wzhfy I will update the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your 

[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110212976
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D1 (dimension)
+attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D2 (dimension)
+attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D3 (dimension)
+attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// T1
+attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(5),
+  nullCount = 1, avgLen = 4, maxLen = 4),
+attr("t1_c2") -> 

[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110211908
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
+// Join is a subset of non-star
+join.subsetOf(nonStarJoins))
+  }
+}
+
+/**
+ * Helper class that keeps information about the join graph as sets of Int.
--- End diff --

@wzhfy I will correct that. Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110211689
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
--- End diff --

@wzhfy I will generalize the star detection algorithm in a follow up PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110211394
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
--- End diff --

@viirya The star filter ensures that star-joins will be planned together. 
It is the cost based optimizer that decide on the best execution plan within a 
star-join. Let me know if I answer your question.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110210819
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
--- End diff --

@wzhfy @viirya I intentionally added the condition here, in addition to the 
outside disjointness check, so the star algorithm is self contained. I would 
prefer to keep it here as well, unless you have strong objections.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110090953
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
--- End diff --

yap, you can see `StarSchemaDetection`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110090773
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
--- End diff --

Seems multi-star detection is not supported?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110090373
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
--- End diff --

Same for some other attributes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110090087
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 
0))
 }.toMap)
 
+// Build filters from the join graph to be used by the search 
algorithm.
+val filters = JoinReorderDPFilters(conf).buildJoinGraphInfo(items, 
conditions, itemIndex)
+
 // Build plans for next levels until the last level has only one plan. 
This plan contains
 // all items that can be joined, so there's no need to continue.
 val topOutputSet = AttributeSet(output)
-while (foundPlans.size < items.length && foundPlans.last.size > 1) {
+while (foundPlans.size < items.length) {
--- End diff --

yea we should remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110089743
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D1 (dimension)
+attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D2 (dimension)
+attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D3 (dimension)
+attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// T1
+attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(5),
+  nullCount = 1, avgLen = 4, maxLen = 4),
+attr("t1_c2") -> ColumnStat(distinctCount = 

[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110089115
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D1 (dimension)
+attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(100),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = 
Some(20),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D2 (dimension)
+attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// D3 (dimension)
+attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(5),
+  nullCount = 0, avgLen = 4, maxLen = 4),
+
+// T1
--- End diff --

add some comment to indicate it's a table not in a star schema?


---
If your project is set up for it, you can reply to this email and have your
reply appear on 

[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110089871
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110088813
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
+// Join is a subset of non-star
+join.subsetOf(nonStarJoins))
+  }
+}
+
+/**
+ * Helper class that keeps information about the join graph as sets of Int.
--- End diff --

sets of Int -> sets of item ids


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-06 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110089241
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 ---
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import 
org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, 
StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+CASE_SENSITIVE -> true,
+CBO_ENABLED -> true,
+JOIN_REORDER_ENABLED -> true,
+STARSCHEMA_DETECTION -> true,
+JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Operator Optimizations", FixedPoint(100),
+CombineFilters,
+PushDownPredicate,
+ReorderJoin(conf),
+PushPredicateThroughJoin,
+ColumnPruning,
+CollapseProject) ::
+Batch("Join Reorder", Once,
+  CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+// F1 (fact table)
+attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = 
Some(50),
--- End diff --

It's an integer attribute, if its value range is [1, 50], ndv can't be 
larger than 50.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110083521
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
+  // Either star or non-star is empty
+  (starJoins.isEmpty || nonStarJoins.isEmpty ||
+// Join is a subset of the star-join
+join.subsetOf(starJoins) ||
+// Star-join is a subset of join
+starJoins.subsetOf(join) ||
--- End diff --

Can't a star join within a built join be reordered?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110078548
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -327,3 +345,104 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + 
other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join 
enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid 
materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+  items: Seq[LogicalPlan],
+  conditions: Set[Expression],
+  planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+// Compute the tables in a star-schema relationship.
+val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+  val (starInt, nonStarInt) = planIndex.collect {
+case (p, i) if starJoin.contains(p) =>
+  (Some(i), None)
+case (p, i) if nonStarJoin.contains(p) =>
+  (None, Some(i))
+case _ =>
+  (None, None)
+  }.unzip
+  Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
+} else {
+  // Nothing interesting to return.
+  None
+}
+  }
+
+  /**
+   * Applies star-join filter.
+   *
+   * Given the outer/inner and the star/non-star sets,
+   * the following plan combinations are allowed:
+   * 1) (outer U inner) is a subset of star-join
+   * 2) star-join is a subset of (outer U inner)
+   * 3) (outer U inner) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   */
+  def starJoinFilter(
+  outer: Set[Int],
+  inner: Set[Int],
+  filters: JoinGraphInfo) : Boolean = {
+val starJoins = filters.starJoins
+val nonStarJoins = filters.nonStarJoins
+val join = outer.union(inner)
+
+// Disjoint sets
+outer.intersect(inner).isEmpty &&
--- End diff --

I think the check in the beginning of `buildJoin` can guarantee this 
already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-05 Thread ioana-delaney
Github user ioana-delaney commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110076651
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with 
Logging {
   case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 
0))
 }.toMap)
 
+// Build filters from the join graph to be used by the search 
algorithm.
+val filters = JoinReorderDPFilters(conf).buildJoinGraphInfo(items, 
conditions, itemIndex)
+
 // Build plans for next levels until the last level has only one plan. 
This plan contains
 // all items that can be joined, so there's no need to continue.
 val topOutputSet = AttributeSet(output)
-while (foundPlans.size < items.length && foundPlans.last.size > 1) {
+while (foundPlans.size < items.length) {
--- End diff --

@wzhfy  Condition "foundPlans.last.size > 1" does not apply when filters 
are used with the join enumeration since not all the plan combinations are 
generated. Without filters, I think the condition will always be satisfied, so 
I removed it completely. Let me know if you have a counter example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-05 Thread ioana-delaney
GitHub user ioana-delaney opened a pull request:

https://github.com/apache/spark/pull/17546

[SPARK-20233] [SQL] Apply star-join filter heuristics to dynamic 
programming join enumeration

## What changes were proposed in this pull request?

Implements star-join filter to reduce the search space for dynamic 
programming join enumeration. Consider the following join graph:

```
T1   D1 - T2 - T3
  \ /
F1
 |
D2

star-join: {F1, D1, D2}
non-star: {T1, T2, T3}
```
The following join combinations will be generated:
```
level 0: (F1), (D1), (D2), (T1), (T2), (T3)
level 1: {F1, D1}, {F1, D2}, {T2, T3}
level 2: {F1, D1, D2}
level 3: {F1, D1, D2, T1}, {F1, D1, D2, T2}
level 4: {F1, D1, D2, T1, T2}, {F1, D1, D2, T2, T3 }
level 6: {F1, D1, D2, T1, T2, T3}
```

## How was this patch tested?

New test suite ```StarJOinCostBasedReorderSuite.scala```.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ioana-delaney/spark starSchemaCBOv3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17546.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17546


commit 5eb9b30d4d3ae4d6bc4d137af5ec09a876df805a
Author: Ioana Delaney 
Date:   2017-04-06T04:04:20Z

[SPARK-20233] Implement star-join filter for join enumeration.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org