cloud-fan commented on a change in pull request #32049: URL: https://github.com/apache/spark/pull/32049#discussion_r668104749
########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownAggregates.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Aggregation; + +/** + * A mix-in interface for {@link ScanBuilder}. Data source can implement this interface to + * push down aggregates. Depends on the data source implementation, the aggregates may not + * be able to push down, or partially push down and have final aggregate at Spark. Review comment: let's clearly define what `partially push down` means in the doc, with examples. ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownAggregates.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Aggregation; + +/** + * A mix-in interface for {@link ScanBuilder}. Data source can implement this interface to + * push down aggregates. Depends on the data source implementation, the aggregates may not + * be able to push down, or partially push down and have final aggregate at Spark. + * + * When pushing down operators, Spark pushes down filters to the data source first, then push down + * aggregates or apply column pruning. Depends on data source implementation, aggregates may or + * may not be able to be pushed down with filters. If pushed filters still need to be evaluated + * after scanning, aggregates can't be pushed down. + * + * @since 3.2.0 + */ +@Evolving +public interface SupportsPushDownAggregates extends ScanBuilder { + + /** + * Pushes down Aggregation to datasource. The order of the datasource scan output is: Review comment: `is` -> `should be` ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownAggregates.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Aggregation; + +/** + * A mix-in interface for {@link ScanBuilder}. Data source can implement this interface to + * push down aggregates. Depends on the data source implementation, the aggregates may not + * be able to push down, or partially push down and have final aggregate at Spark. + * + * When pushing down operators, Spark pushes down filters to the data source first, then push down + * aggregates or apply column pruning. Depends on data source implementation, aggregates may or + * may not be able to be pushed down with filters. If pushed filters still need to be evaluated + * after scanning, aggregates can't be pushed down. + * + * @since 3.2.0 + */ +@Evolving +public interface SupportsPushDownAggregates extends ScanBuilder { + + /** + * Pushes down Aggregation to datasource. The order of the datasource scan output is: Review comment: `... output columns is` -> `... output columns should be` ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownAggregates.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Aggregation; + +/** + * A mix-in interface for {@link ScanBuilder}. Data source can implement this interface to + * push down aggregates. Depends on the data source implementation, the aggregates may not + * be able to push down, or partially push down and have final aggregate at Spark. + * + * When pushing down operators, Spark pushes down filters to the data source first, then push down + * aggregates or apply column pruning. Depends on data source implementation, aggregates may or + * may not be able to be pushed down with filters. If pushed filters still need to be evaluated + * after scanning, aggregates can't be pushed down. + * + * @since 3.2.0 + */ +@Evolving +public interface SupportsPushDownAggregates extends ScanBuilder { + + /** + * Pushes down Aggregation to datasource. The order of the datasource scan output is: Review comment: `... output is` -> `... output columns should be` ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,184 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case Filter(condition, sHolder: ScanBuilderHolder) => + val filters = splitConjunctivePredicates(condition) + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) + if project.forall(_.isInstanceOf[AttributeReference]) => + sHolder.builder match { + case _: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) Review comment: since `project.forall(_.isInstanceOf[AttributeReference])`, I don't think we need `getAggregateExpression` to de-alias any more. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,184 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case Filter(condition, sHolder: ScanBuilderHolder) => + val filters = splitConjunctivePredicates(condition) + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) + if project.forall(_.isInstanceOf[AttributeReference]) => + sHolder.builder match { + case _: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) + val pushedAggregates = PushDownUtils + .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + if (pushedAggregates.aggregateExpressions.isEmpty) { Review comment: Is it possible to push down group-only aggregates? Seems we should check if both group cols and agg exprs are empty. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,184 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case Filter(condition, sHolder: ScanBuilderHolder) => + val filters = splitConjunctivePredicates(condition) + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) + if project.forall(_.isInstanceOf[AttributeReference]) => + sHolder.builder match { + case _: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) + val pushedAggregates = PushDownUtils + .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + if (pushedAggregates.aggregateExpressions.isEmpty) { Review comment: or `PushDownUtils.pushAggregates` should return `Option[Aggregation]`, and we can remove `Aggregation.empty` ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,184 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case Filter(condition, sHolder: ScanBuilderHolder) => + val filters = splitConjunctivePredicates(condition) + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) + if project.forall(_.isInstanceOf[AttributeReference]) => + sHolder.builder match { + case _: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) + val pushedAggregates = PushDownUtils + .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + if (pushedAggregates.aggregateExpressions.isEmpty) { + aggNode // return original plan node + } else { + // use the aggregate columns as the output columns + // e.g. TABLE t (c1 INT, c2 INT, c3 INT) + // SELECT min(c1), max(c1) FROM t; Review comment: can we also include group by cols in the example? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,179 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.catalyst.util.toPrettySQL +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case filter @ Filter(_, sHolder: ScanBuilderHolder) => + val (filters, _, _) = collectFilters(filter).get + + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode@Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) => + sHolder.builder match { + case r: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) + val pushedAggregates = PushDownUtils + .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + if (pushedAggregates.aggregateExpressions.isEmpty) { + aggNode // return original plan node + } else { + // use the aggregate columns as the output columns + // e.g. TABLE t (c1 INT, c2 INT, c3 INT) + // SELECT min(c1), max(c1) FROM t; + // Use min(c1), max(c1) as output for DataSourceV2ScanRelation + // We want to have the following logical plan: + // == Optimized Logical Plan == + // Aggregate [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18] + // +- RelationV2[min(c1)#21, max(c1)#22] parquet file ... + val output = aggregates.map { + case agg: AggregateExpression => + AttributeReference(toPrettySQL(agg), agg.dataType)() + } + + // No need to do column pruning because only the aggregate columns are used as + // DataSourceV2ScanRelation output columns. All the other columns are not + // included in the output. Since PushDownUtils.pruneColumns is not called, + // ScanBuilder.requiredSchema is not pruned, but ScanBuilder.requiredSchema is + // not used anyways. The schema for aggregate columns will be built in Scan. + val scan = sHolder.builder.build() + + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Aggregate Functions: + | ${pushedAggregates.aggregateExpressions.mkString(", ")} + |Output: ${output.mkString(", ")} + """.stripMargin) + + val scanRelation = DataSourceV2ScanRelation(sHolder.relation, scan, output) + val plan = Aggregate(groupingExpressions, resultExpressions, scanRelation) + + // Change the optimized logical plan to reflect the pushed down aggregate + // e.g. TABLE t (c1 INT, c2 INT, c3 INT) + // SELECT min(c1), max(c1) FROM t; + // The original logical plan is + // Aggregate [min(c1#9) AS min(c1)#17, max(c1#9) AS max(c1)#18] + // +- RelationV2[c1#9] parquet ... + // + // After change the V2ScanRelation output to [min(_1)#21, max(_1)#22] + // we have the following + // !Aggregate [min(_1#9) AS min(_1)#17, max(_1#9) AS max(_1)#18] + // +- RelationV2[min(_1)#21, max(_1)#22] parquet ... + // + // We want to change it to + // == Optimized Logical Plan == + // Aggregate [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18] + // +- RelationV2[min(c1)#21, max(c1)#22] parquet file ... + var i = 0 + plan.transformExpressions { Review comment: nvm, the output is kept: https://github.com/apache/spark/pull/32049/files#diff-cf61a7ed875ef854164cd3d741e5adc4c1694c05b1bb3bdc19dd5c8dc5bca8bdR117 ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,184 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case Filter(condition, sHolder: ScanBuilderHolder) => + val filters = splitConjunctivePredicates(condition) + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) + if project.forall(_.isInstanceOf[AttributeReference]) => + sHolder.builder match { + case _: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) + val pushedAggregates = PushDownUtils + .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + if (pushedAggregates.aggregateExpressions.isEmpty) { + aggNode // return original plan node + } else { + // use the aggregate columns as the output columns + // e.g. TABLE t (c1 INT, c2 INT, c3 INT) + // SELECT min(c1), max(c1) FROM t; + // Use min(c1), max(c1) as output for DataSourceV2ScanRelation + // We want to have the following logical plan: + // == Optimized Logical Plan == + // Aggregate [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18] + // +- RelationV2[min(c1)#21, max(c1)#22] parquet file ... + var index = 0 + val output = resultExpressions.map { + case Alias(_, name) => + index = index + 1 + AttributeReference(name, aggregates(index - 1).dataType)() + case a: AttributeReference => a + } + + // No need to do column pruning because only the aggregate columns are used as + // DataSourceV2ScanRelation output columns. All the other columns are not + // included in the output. Since PushDownUtils.pruneColumns is not called, + // ScanBuilder.requiredSchema is not pruned, but ScanBuilder.requiredSchema is + // not used anyways. The schema for aggregate columns will be built in Scan. + val scan = sHolder.builder.build() Review comment: can we make sure `scan.readSchema` matches the `output` constructed above? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,184 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case Filter(condition, sHolder: ScanBuilderHolder) => + val filters = splitConjunctivePredicates(condition) + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) + if project.forall(_.isInstanceOf[AttributeReference]) => + sHolder.builder match { + case _: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) + val pushedAggregates = PushDownUtils + .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + if (pushedAggregates.aggregateExpressions.isEmpty) { + aggNode // return original plan node + } else { + // use the aggregate columns as the output columns + // e.g. TABLE t (c1 INT, c2 INT, c3 INT) + // SELECT min(c1), max(c1) FROM t; + // Use min(c1), max(c1) as output for DataSourceV2ScanRelation + // We want to have the following logical plan: + // == Optimized Logical Plan == + // Aggregate [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18] + // +- RelationV2[min(c1)#21, max(c1)#22] parquet file ... + var index = 0 + val output = resultExpressions.map { + case Alias(_, name) => Review comment: can't we simply do: `resultExpressions.map(_.toAttribute)`? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,184 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case Filter(condition, sHolder: ScanBuilderHolder) => + val filters = splitConjunctivePredicates(condition) + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) + if project.forall(_.isInstanceOf[AttributeReference]) => + sHolder.builder match { + case _: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) + val pushedAggregates = PushDownUtils + .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + if (pushedAggregates.aggregateExpressions.isEmpty) { + aggNode // return original plan node + } else { + // use the aggregate columns as the output columns + // e.g. TABLE t (c1 INT, c2 INT, c3 INT) + // SELECT min(c1), max(c1) FROM t; + // Use min(c1), max(c1) as output for DataSourceV2ScanRelation + // We want to have the following logical plan: + // == Optimized Logical Plan == + // Aggregate [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18] + // +- RelationV2[min(c1)#21, max(c1)#22] parquet file ... + var index = 0 + val output = resultExpressions.map { + case Alias(_, name) => Review comment: is this corrected? if the query is `SELECT max(c) + min(c) as res FROM t`, what we push down is `max(c)` and `min(c)`, and the expected output of the scan relation should be `max(c)#id` and `min(c)#id`, instead of `res#id`. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,184 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case Filter(condition, sHolder: ScanBuilderHolder) => + val filters = splitConjunctivePredicates(condition) + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) + if project.forall(_.isInstanceOf[AttributeReference]) => + sHolder.builder match { + case _: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) Review comment: since `project.forall(_.isInstanceOf[AttributeReference])`, I don't think we need `getAggregateExpression` to de-alias any more. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -83,16 +206,35 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] { } else { withFilter } - withProjection } + + private def getAggregateExpression( + resultExpressions: Seq[NamedExpression], + project: Seq[NamedExpression], + sHolder: ScanBuilderHolder): Seq[AggregateExpression] = { + val aggregates = resultExpressions.flatMap { expr => + expr.collect { + case agg: AggregateExpression => + replaceAlias(agg, getAliasMap(project)).asInstanceOf[AggregateExpression] Review comment: since `project.forall(_.isInstanceOf[AttributeReference])`, I don't think we need to de-alias any more. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,184 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case Filter(condition, sHolder: ScanBuilderHolder) => + val filters = splitConjunctivePredicates(condition) + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) + if project.forall(_.isInstanceOf[AttributeReference]) => + sHolder.builder match { + case _: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) + val pushedAggregates = PushDownUtils + .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + if (pushedAggregates.aggregateExpressions.isEmpty) { + aggNode // return original plan node + } else { + // use the aggregate columns as the output columns + // e.g. TABLE t (c1 INT, c2 INT, c3 INT) + // SELECT min(c1), max(c1) FROM t; + // Use min(c1), max(c1) as output for DataSourceV2ScanRelation + // We want to have the following logical plan: + // == Optimized Logical Plan == + // Aggregate [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18] + // +- RelationV2[min(c1)#21, max(c1)#22] parquet file ... + var index = 0 + val output = resultExpressions.map { + case Alias(_, name) => + index = index + 1 + AttributeReference(name, aggregates(index - 1).dataType)() + case a: AttributeReference => a + } + + // No need to do column pruning because only the aggregate columns are used as + // DataSourceV2ScanRelation output columns. All the other columns are not + // included in the output. Since PushDownUtils.pruneColumns is not called, + // ScanBuilder.requiredSchema is not pruned, but ScanBuilder.requiredSchema is + // not used anyways. The schema for aggregate columns will be built in Scan. + val scan = sHolder.builder.build() Review comment: can we make sure `scan.readSchema` matches the `output` constructed above? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,179 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.catalyst.util.toPrettySQL +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case filter @ Filter(_, sHolder: ScanBuilderHolder) => + val (filters, _, _) = collectFilters(filter).get + + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode@Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) => + sHolder.builder match { + case r: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) + val pushedAggregates = PushDownUtils + .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + if (pushedAggregates.aggregateExpressions.isEmpty) { + aggNode // return original plan node + } else { + // use the aggregate columns as the output columns + // e.g. TABLE t (c1 INT, c2 INT, c3 INT) + // SELECT min(c1), max(c1) FROM t; + // Use min(c1), max(c1) as output for DataSourceV2ScanRelation + // We want to have the following logical plan: + // == Optimized Logical Plan == + // Aggregate [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18] + // +- RelationV2[min(c1)#21, max(c1)#22] parquet file ... + val output = aggregates.map { + case agg: AggregateExpression => + AttributeReference(toPrettySQL(agg), agg.dataType)() + } + + // No need to do column pruning because only the aggregate columns are used as + // DataSourceV2ScanRelation output columns. All the other columns are not + // included in the output. Since PushDownUtils.pruneColumns is not called, + // ScanBuilder.requiredSchema is not pruned, but ScanBuilder.requiredSchema is + // not used anyways. The schema for aggregate columns will be built in Scan. + val scan = sHolder.builder.build() + + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Aggregate Functions: + | ${pushedAggregates.aggregateExpressions.mkString(", ")} + |Output: ${output.mkString(", ")} + """.stripMargin) + + val scanRelation = DataSourceV2ScanRelation(sHolder.relation, scan, output) + val plan = Aggregate(groupingExpressions, resultExpressions, scanRelation) + + // Change the optimized logical plan to reflect the pushed down aggregate + // e.g. TABLE t (c1 INT, c2 INT, c3 INT) + // SELECT min(c1), max(c1) FROM t; + // The original logical plan is + // Aggregate [min(c1#9) AS min(c1)#17, max(c1#9) AS max(c1)#18] + // +- RelationV2[c1#9] parquet ... + // + // After change the V2ScanRelation output to [min(_1)#21, max(_1)#22] + // we have the following + // !Aggregate [min(_1#9) AS min(_1)#17, max(_1#9) AS max(_1)#18] + // +- RelationV2[min(_1)#21, max(_1)#22] parquet ... + // + // We want to change it to + // == Optimized Logical Plan == + // Aggregate [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18] + // +- RelationV2[min(c1)#21, max(c1)#22] parquet file ... + var i = 0 + plan.transformExpressions { Review comment: nvm, the output is kept: https://github.com/apache/spark/pull/32049/files#diff-cf61a7ed875ef854164cd3d741e5adc4c1694c05b1bb3bdc19dd5c8dc5bca8bdR117 ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -83,16 +206,35 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] { } else { withFilter } - withProjection } + + private def getAggregateExpression( + resultExpressions: Seq[NamedExpression], + project: Seq[NamedExpression], + sHolder: ScanBuilderHolder): Seq[AggregateExpression] = { + val aggregates = resultExpressions.flatMap { expr => + expr.collect { + case agg: AggregateExpression => + replaceAlias(agg, getAliasMap(project)).asInstanceOf[AggregateExpression] + } + } + DataSourceStrategy.normalizeExprs(aggregates, sHolder.relation.output) + .asInstanceOf[Seq[AggregateExpression]] + } } +case class ScanBuilderHolder( + output: Seq[AttributeReference], + relation: DataSourceV2Relation, + builder: ScanBuilder) extends LeafNode + // A wrapper for v1 scan to carry the translated filters and the handled ones. This is required by // the physical v1 scan node. case class V1ScanWrapper( v1Scan: V1Scan, translatedFilters: Seq[sources.Filter], - handledFilters: Seq[sources.Filter]) extends Scan { + handledFilters: Seq[sources.Filter], + pushedAggregates: Aggregation) extends Scan { Review comment: why do we put it here if we are not able to support it? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala ########## @@ -70,6 +72,36 @@ object PushDownUtils extends PredicateHelper { } } + /** + * Pushes down aggregates to the data source reader + * + * @return pushed aggregation. + */ + def pushAggregates( + scanBuilder: ScanBuilder, + aggregates: Seq[AggregateExpression], + groupBy: Seq[Expression]): Aggregation = { + + def columnAsString(e: Expression): Option[FieldReference] = e match { + case AttributeReference(name, _, _, _) => Some(FieldReference(Seq(name))) + case _ => None + } + + scanBuilder match { + case r: SupportsPushDownAggregates => + val translatedAggregates = aggregates.map(DataSourceStrategy.translateAggregate) + val translatedGroupBys = groupBy.map(columnAsString) + + val agg = Aggregation(translatedAggregates.flatten, translatedGroupBys.flatten) Review comment: I think we can only apply pushdown if all the group by cols are supported. e.g. `GROUP BY a, substring(b), c`, it's wrong to pushdown `GROUP BY a, c` ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ########## @@ -17,61 +17,184 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.planning.{OperationHelper, ScanOperation} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.read.{Scan, V1Scan} +import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object V2ScanRelationPushDown extends Rule[LogicalPlan] { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper + with OperationHelper with PredicateHelper { import DataSourceV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case ScanOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + def apply(plan: LogicalPlan): LogicalPlan = { + applyColumnPruning(pushdownAggregate(pushDownFilters(createScanBuilder(plan)))) + } + + private def createScanBuilder(plan: LogicalPlan) = plan.transform { + case r: DataSourceV2Relation => + ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + } - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) + private def pushDownFilters(plan: LogicalPlan) = plan.transform { + // update the scan builder with filter push down and return a new plan with filter pushed + case Filter(condition, sHolder: ScanBuilderHolder) => + val filters = splitConjunctivePredicates(condition) + val normalizedFilters = + DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + sHolder.builder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery + logInfo( + s""" + |Pushing operators to ${sHolder.relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + """.stripMargin) + + val filterCondition = postScanFilters.reduceLeftOption(And) + filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) + } + + def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { + // update the scan builder with agg pushdown and return a new plan with agg pushed + case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => + child match { + case ScanOperation(project, filters, sHolder: ScanBuilderHolder) + if project.forall(_.isInstanceOf[AttributeReference]) => + sHolder.builder match { + case _: SupportsPushDownAggregates => + if (filters.length == 0) { // can't push down aggregate if postScanFilters exist + val aggregates = getAggregateExpression(resultExpressions, project, sHolder) + val pushedAggregates = PushDownUtils + .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + if (pushedAggregates.aggregateExpressions.isEmpty) { + aggNode // return original plan node + } else { + // use the aggregate columns as the output columns + // e.g. TABLE t (c1 INT, c2 INT, c3 INT) + // SELECT min(c1), max(c1) FROM t; + // Use min(c1), max(c1) as output for DataSourceV2ScanRelation + // We want to have the following logical plan: + // == Optimized Logical Plan == + // Aggregate [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18] + // +- RelationV2[min(c1)#21, max(c1)#22] parquet file ... + var index = 0 + val output = resultExpressions.map { + case Alias(_, name) => Review comment: One idea to construct the output: ``` val newOutput = scan.readSchema().toAttributes val groupAttrs = groupingExpressions.zip(newOutput).map { case (a: Attribute, b: Attribute) => b.withExprId(a.exprId) case other => b } val output = groupAttrs ++ newOutput.drop(groupAttrs.length) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
