[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...

2018-04-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20988


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...

2018-04-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20988#discussion_r182858087
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
 
   /**
* A pattern that finds the partitioned table relation node inside the 
given plan, and returns a
-   * pair of the partition attributes and the table relation node.
+   * pair of the partition attributes, partition filters, and the table 
relation node.
*
* It keeps traversing down the given plan tree if there is a 
[[Project]] or [[Filter]] with
* deterministic expressions, and returns result after reaching the 
partitioned table relation
* node.
*/
-  object PartitionedRelation {
-
-def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 
plan match {
-  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
-if fsRelation.partitionSchema.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-Some((AttributeSet(partAttrs), l))
-
-  case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
-Some((AttributeSet(partAttrs), relation))
-
-  case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
-unapply(child).flatMap { case (partAttrs, relation) =>
-  if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
relation)) else None
-}
+  object PartitionedRelation extends PredicateHelper {
+
+def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
+  plan match {
+case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
+  if fsRelation.partitionSchema.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+  Some((AttributeSet(partAttrs), Nil, l))
+
+case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+  Some((AttributeSet(partAttrs), Nil, relation))
+
+case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
+  unapply(child).flatMap { case (partAttrs, filters, relation) =>
+if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
filters, relation)) else None
+  }
 
-  case f @ Filter(condition, child) if condition.deterministic =>
-unapply(child).flatMap { case (partAttrs, relation) =>
-  if (f.references.subsetOf(partAttrs)) Some((partAttrs, 
relation)) else None
-}
+case f @ Filter(condition, child) if condition.deterministic =>
+  unapply(child).flatMap { case (partAttrs, filters, relation) =>
+if (f.references.subsetOf(partAttrs)) {
+  Some((partAttrs, splitConjunctivePredicates(condition) ++ 
filters, relation))
--- End diff --

Good catch. I've added a test case and updated the `PartitionedRelation` 
code to keep track of both original partition attributes -- that the filter 
needs -- and the top-most node's output that is used by the rule.

For using `PhysicalOperation` instead of `PartitionedRelation`, I don't see 
a compelling reason for such an invasive change. This currently adds a couple 
of results to unapply and keeps mostly the same logic. `PhysicalOperation` 
would lose the check that the references are a subset of the partition 
attributes and be a lot larger change. If you think this should be refactored, 
lets talk about that separately to understand the motivation for the change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...

2018-04-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20988#discussion_r182673957
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
 
   /**
* A pattern that finds the partitioned table relation node inside the 
given plan, and returns a
-   * pair of the partition attributes and the table relation node.
+   * pair of the partition attributes, partition filters, and the table 
relation node.
*
* It keeps traversing down the given plan tree if there is a 
[[Project]] or [[Filter]] with
* deterministic expressions, and returns result after reaching the 
partitioned table relation
* node.
*/
-  object PartitionedRelation {
-
-def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 
plan match {
-  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
-if fsRelation.partitionSchema.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-Some((AttributeSet(partAttrs), l))
-
-  case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
-Some((AttributeSet(partAttrs), relation))
-
-  case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
-unapply(child).flatMap { case (partAttrs, relation) =>
-  if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
relation)) else None
-}
+  object PartitionedRelation extends PredicateHelper {
+
+def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
+  plan match {
+case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
+  if fsRelation.partitionSchema.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+  Some((AttributeSet(partAttrs), Nil, l))
+
+case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+  Some((AttributeSet(partAttrs), Nil, relation))
+
+case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
+  unapply(child).flatMap { case (partAttrs, filters, relation) =>
+if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
filters, relation)) else None
+  }
 
-  case f @ Filter(condition, child) if condition.deterministic =>
-unapply(child).flatMap { case (partAttrs, relation) =>
-  if (f.references.subsetOf(partAttrs)) Some((partAttrs, 
relation)) else None
-}
+case f @ Filter(condition, child) if condition.deterministic =>
+  unapply(child).flatMap { case (partAttrs, filters, relation) =>
+if (f.references.subsetOf(partAttrs)) {
+  Some((partAttrs, splitConjunctivePredicates(condition) ++ 
filters, relation))
--- End diff --

there is a bug here. Think about `Filter(x > 1, Project(p + 1 as x, 
Table(a, p, partitioned by p)))`, we will mistakenly report `x > 1` as 
partition predicates and use it to list partitions and fail.

I think we should use `PhysicalOperation` here, which can help us to 
substitute the attributes in filter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...

2018-04-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20988#discussion_r182579387
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
 
   /**
* A pattern that finds the partitioned table relation node inside the 
given plan, and returns a
-   * pair of the partition attributes and the table relation node.
+   * pair of the partition attributes, partition filters, and the table 
relation node.
*
* It keeps traversing down the given plan tree if there is a 
[[Project]] or [[Filter]] with
* deterministic expressions, and returns result after reaching the 
partitioned table relation
* node.
*/
-  object PartitionedRelation {
-
-def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 
plan match {
-  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
-if fsRelation.partitionSchema.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-Some((AttributeSet(partAttrs), l))
-
-  case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
-Some((AttributeSet(partAttrs), relation))
-
-  case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
-unapply(child).flatMap { case (partAttrs, relation) =>
-  if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
relation)) else None
-}
+  object PartitionedRelation extends PredicateHelper {
+
+def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
+  plan match {
+case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
+  if fsRelation.partitionSchema.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+  Some((AttributeSet(partAttrs), Nil, l))
+
+case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+  Some((AttributeSet(partAttrs), Nil, relation))
+
+case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
+  unapply(child).flatMap { case (partAttrs, filters, relation) =>
+if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
filters, relation)) else None
--- End diff --

@cloud-fan, that is basically how this works already. Each matched node 
calls `unapply(child)` to get the result from the child node, then it adds the 
current node's conditions to that result. Using `unapply` instead of 
`getPartitionedRelation` makes this work in the matching rule:

```scala
  case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, 
filters, relation)) =>
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...

2018-04-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20988#discussion_r181535823
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
 
   /**
* A pattern that finds the partitioned table relation node inside the 
given plan, and returns a
-   * pair of the partition attributes and the table relation node.
+   * pair of the partition attributes, partition filters, and the table 
relation node.
*
* It keeps traversing down the given plan tree if there is a 
[[Project]] or [[Filter]] with
* deterministic expressions, and returns result after reaching the 
partitioned table relation
* node.
*/
-  object PartitionedRelation {
-
-def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 
plan match {
-  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
-if fsRelation.partitionSchema.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-Some((AttributeSet(partAttrs), l))
-
-  case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
-Some((AttributeSet(partAttrs), relation))
-
-  case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
-unapply(child).flatMap { case (partAttrs, relation) =>
-  if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
relation)) else None
-}
+  object PartitionedRelation extends PredicateHelper {
+
+def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
+  plan match {
+case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
+  if fsRelation.partitionSchema.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+  Some((AttributeSet(partAttrs), Nil, l))
+
+case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+  Some((AttributeSet(partAttrs), Nil, relation))
+
+case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
+  unapply(child).flatMap { case (partAttrs, filters, relation) =>
+if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
filters, relation)) else None
--- End diff --

I'd propose something top-down like
```
def getPartitionedRelation(
plan: LogicalPlan,
predicates: Seq[Expression]): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
  plan match {
case Filter(condition, child) if condition.deterministic =>
  getPartitionedRelation(child, predicates ++ 
splitConjunctivePredicates(condition))
   
case Project(projectList, child) if projectList.forall(_.deterministic) 
=>
  getPartitionedRelation(child, 
predicates.filter(_.references.subsetOf(child.outputSet)))

case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) if 
fsRelation.partitionSchema.nonEmpty =>
  val partAttrs = ...
  val partitionFilters = 
predicates.filter(_.references.subsetOf(partAttrs))
  Some(...)

case _ => None
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...

2018-04-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20988#discussion_r181535484
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
 
   /**
* A pattern that finds the partitioned table relation node inside the 
given plan, and returns a
-   * pair of the partition attributes and the table relation node.
+   * pair of the partition attributes, partition filters, and the table 
relation node.
*
* It keeps traversing down the given plan tree if there is a 
[[Project]] or [[Filter]] with
* deterministic expressions, and returns result after reaching the 
partitioned table relation
* node.
*/
-  object PartitionedRelation {
-
-def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 
plan match {
-  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
-if fsRelation.partitionSchema.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-Some((AttributeSet(partAttrs), l))
-
-  case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
-val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
-Some((AttributeSet(partAttrs), relation))
-
-  case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
-unapply(child).flatMap { case (partAttrs, relation) =>
-  if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
relation)) else None
-}
+  object PartitionedRelation extends PredicateHelper {
+
+def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
+  plan match {
+case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
+  if fsRelation.partitionSchema.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+  Some((AttributeSet(partAttrs), Nil, l))
+
+case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
+  val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+  Some((AttributeSet(partAttrs), Nil, relation))
+
+case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
+  unapply(child).flatMap { case (partAttrs, filters, relation) =>
+if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
filters, relation)) else None
--- End diff --

what about `Filter(p > 1, Project(a, p, Table(a, b, p, partitioned by 
p)))`? `p > 1` should also be a partition filter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...

2018-04-05 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/20988

[SPARK-23877][SQL]: Use filter predicates to prune partitions in 
metadata-only queries

## What changes were proposed in this pull request?

This updates the OptimizeMetadataOnlyQuery rule to use filter expressions 
when listing partitions, if there are filter nodes in the logical plan. This 
avoids listing all partitions for large tables on the driver.

This also fixes a minor bug where the partitions returned from fsRelation 
cannot be serialized without hitting a stack level too deep error. This is 
caused by serializing a stream to executors, where the stream is a recursive 
structure. If the stream is too long, the serialization stack reaches the 
maximum level of depth. The fix is to create a LocalRelation using an Array 
instead of the incoming Seq.

## How was this patch tested?

Existing tests for metadata-only queries.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-23877-metadata-only-push-filters

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20988.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20988


commit 552efaee05b64f9ed4d5496b3b1d11b57b985f85
Author: Ryan Blue 
Date:   2018-03-14T21:50:11Z

Support filter conditions in metadata-only queries.

commit 2345896288828aefe14ebcb370d374b348400cf4
Author: Ryan Blue 
Date:   2018-03-14T22:43:56Z

Ensure partition data is an Array.

The LocalRelation created for partition data for metadata-only queries
may be a stream produced by listing directories. If the stream is large,
serializing the LocalRelation to executors results in a stack overflow
because the stream is a recursive structure of (head, rest-of-stream).




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org