[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20988 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20988#discussion_r182858087 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic /** * A pattern that finds the partitioned table relation node inside the given plan, and returns a - * pair of the partition attributes and the table relation node. + * pair of the partition attributes, partition filters, and the table relation node. * * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with * deterministic expressions, and returns result after reaching the partitioned table relation * node. */ - object PartitionedRelation { - -def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) -if fsRelation.partitionSchema.nonEmpty => -val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -Some((AttributeSet(partAttrs), l)) - - case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => -val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) -Some((AttributeSet(partAttrs), relation)) - - case p @ Project(projectList, child) if projectList.forall(_.deterministic) => -unapply(child).flatMap { case (partAttrs, relation) => - if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None -} + object PartitionedRelation extends PredicateHelper { + +def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { + plan match { +case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) + if fsRelation.partitionSchema.nonEmpty => + val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) + Some((AttributeSet(partAttrs), Nil, l)) + +case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + Some((AttributeSet(partAttrs), Nil, relation)) + +case p @ Project(projectList, child) if projectList.forall(_.deterministic) => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None + } - case f @ Filter(condition, child) if condition.deterministic => -unapply(child).flatMap { case (partAttrs, relation) => - if (f.references.subsetOf(partAttrs)) Some((partAttrs, relation)) else None -} +case f @ Filter(condition, child) if condition.deterministic => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (f.references.subsetOf(partAttrs)) { + Some((partAttrs, splitConjunctivePredicates(condition) ++ filters, relation)) --- End diff -- Good catch. I've added a test case and updated the `PartitionedRelation` code to keep track of both original partition attributes -- that the filter needs -- and the top-most node's output that is used by the rule. For using `PhysicalOperation` instead of `PartitionedRelation`, I don't see a compelling reason for such an invasive change. This currently adds a couple of results to unapply and keeps mostly the same logic. `PhysicalOperation` would lose the check that the references are a subset of the partition attributes and be a lot larger change. If you think this should be refactored, lets talk about that separately to understand the motivation for the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20988#discussion_r182673957 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic /** * A pattern that finds the partitioned table relation node inside the given plan, and returns a - * pair of the partition attributes and the table relation node. + * pair of the partition attributes, partition filters, and the table relation node. * * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with * deterministic expressions, and returns result after reaching the partitioned table relation * node. */ - object PartitionedRelation { - -def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) -if fsRelation.partitionSchema.nonEmpty => -val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -Some((AttributeSet(partAttrs), l)) - - case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => -val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) -Some((AttributeSet(partAttrs), relation)) - - case p @ Project(projectList, child) if projectList.forall(_.deterministic) => -unapply(child).flatMap { case (partAttrs, relation) => - if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None -} + object PartitionedRelation extends PredicateHelper { + +def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { + plan match { +case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) + if fsRelation.partitionSchema.nonEmpty => + val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) + Some((AttributeSet(partAttrs), Nil, l)) + +case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + Some((AttributeSet(partAttrs), Nil, relation)) + +case p @ Project(projectList, child) if projectList.forall(_.deterministic) => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None + } - case f @ Filter(condition, child) if condition.deterministic => -unapply(child).flatMap { case (partAttrs, relation) => - if (f.references.subsetOf(partAttrs)) Some((partAttrs, relation)) else None -} +case f @ Filter(condition, child) if condition.deterministic => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (f.references.subsetOf(partAttrs)) { + Some((partAttrs, splitConjunctivePredicates(condition) ++ filters, relation)) --- End diff -- there is a bug here. Think about `Filter(x > 1, Project(p + 1 as x, Table(a, p, partitioned by p)))`, we will mistakenly report `x > 1` as partition predicates and use it to list partitions and fail. I think we should use `PhysicalOperation` here, which can help us to substitute the attributes in filter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20988#discussion_r182579387 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic /** * A pattern that finds the partitioned table relation node inside the given plan, and returns a - * pair of the partition attributes and the table relation node. + * pair of the partition attributes, partition filters, and the table relation node. * * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with * deterministic expressions, and returns result after reaching the partitioned table relation * node. */ - object PartitionedRelation { - -def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) -if fsRelation.partitionSchema.nonEmpty => -val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -Some((AttributeSet(partAttrs), l)) - - case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => -val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) -Some((AttributeSet(partAttrs), relation)) - - case p @ Project(projectList, child) if projectList.forall(_.deterministic) => -unapply(child).flatMap { case (partAttrs, relation) => - if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None -} + object PartitionedRelation extends PredicateHelper { + +def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { + plan match { +case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) + if fsRelation.partitionSchema.nonEmpty => + val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) + Some((AttributeSet(partAttrs), Nil, l)) + +case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + Some((AttributeSet(partAttrs), Nil, relation)) + +case p @ Project(projectList, child) if projectList.forall(_.deterministic) => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None --- End diff -- @cloud-fan, that is basically how this works already. Each matched node calls `unapply(child)` to get the result from the child node, then it adds the current node's conditions to that result. Using `unapply` instead of `getPartitionedRelation` makes this work in the matching rule: ```scala case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, filters, relation)) => ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20988#discussion_r181535823 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic /** * A pattern that finds the partitioned table relation node inside the given plan, and returns a - * pair of the partition attributes and the table relation node. + * pair of the partition attributes, partition filters, and the table relation node. * * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with * deterministic expressions, and returns result after reaching the partitioned table relation * node. */ - object PartitionedRelation { - -def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) -if fsRelation.partitionSchema.nonEmpty => -val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -Some((AttributeSet(partAttrs), l)) - - case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => -val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) -Some((AttributeSet(partAttrs), relation)) - - case p @ Project(projectList, child) if projectList.forall(_.deterministic) => -unapply(child).flatMap { case (partAttrs, relation) => - if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None -} + object PartitionedRelation extends PredicateHelper { + +def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { + plan match { +case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) + if fsRelation.partitionSchema.nonEmpty => + val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) + Some((AttributeSet(partAttrs), Nil, l)) + +case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + Some((AttributeSet(partAttrs), Nil, relation)) + +case p @ Project(projectList, child) if projectList.forall(_.deterministic) => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None --- End diff -- I'd propose something top-down like ``` def getPartitionedRelation( plan: LogicalPlan, predicates: Seq[Expression]): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { plan match { case Filter(condition, child) if condition.deterministic => getPartitionedRelation(child, predicates ++ splitConjunctivePredicates(condition)) case Project(projectList, child) if projectList.forall(_.deterministic) => getPartitionedRelation(child, predicates.filter(_.references.subsetOf(child.outputSet))) case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) if fsRelation.partitionSchema.nonEmpty => val partAttrs = ... val partitionFilters = predicates.filter(_.references.subsetOf(partAttrs)) Some(...) case _ => None } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20988#discussion_r181535484 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic /** * A pattern that finds the partitioned table relation node inside the given plan, and returns a - * pair of the partition attributes and the table relation node. + * pair of the partition attributes, partition filters, and the table relation node. * * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with * deterministic expressions, and returns result after reaching the partitioned table relation * node. */ - object PartitionedRelation { - -def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) -if fsRelation.partitionSchema.nonEmpty => -val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -Some((AttributeSet(partAttrs), l)) - - case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => -val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) -Some((AttributeSet(partAttrs), relation)) - - case p @ Project(projectList, child) if projectList.forall(_.deterministic) => -unapply(child).flatMap { case (partAttrs, relation) => - if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None -} + object PartitionedRelation extends PredicateHelper { + +def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { + plan match { +case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) + if fsRelation.partitionSchema.nonEmpty => + val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) + Some((AttributeSet(partAttrs), Nil, l)) + +case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + Some((AttributeSet(partAttrs), Nil, relation)) + +case p @ Project(projectList, child) if projectList.forall(_.deterministic) => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None --- End diff -- what about `Filter(p > 1, Project(a, p, Table(a, b, p, partitioned by p)))`? `p > 1` should also be a partition filter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/20988 [SPARK-23877][SQL]: Use filter predicates to prune partitions in metadata-only queries ## What changes were proposed in this pull request? This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver. This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq. ## How was this patch tested? Existing tests for metadata-only queries. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-23877-metadata-only-push-filters Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20988.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20988 commit 552efaee05b64f9ed4d5496b3b1d11b57b985f85 Author: Ryan Blue Date: 2018-03-14T21:50:11Z Support filter conditions in metadata-only queries. commit 2345896288828aefe14ebcb370d374b348400cf4 Author: Ryan Blue Date: 2018-03-14T22:43:56Z Ensure partition data is an Array. The LocalRelation created for partition data for metadata-only queries may be a stream produced by listing directories. If the stream is large, serializing the LocalRelation to executors results in a stack overflow because the stream is a recursive structure of (head, rest-of-stream). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org