aokolnychyi commented on code in PR #41700:
URL: https://github.com/apache/spark/pull/41700#discussion_r1239135361
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala:
##########
@@ -56,29 +62,67 @@ object GroupBasedRowLevelOperationScanPlanning extends
Rule[LogicalPlan] with Pr
s"""
|Pushing operators to ${relation.name}
|Pushed filters: $pushedFiltersStr
- |Filters that were not pushed: ${remainingFilters.mkString(", ")}
+ |Filters evaluated on data source side:
${evaluatedFilters.mkString(", ")}
+ |Filters evaluated on Spark side: ${postScanFilters.mkString(", ")}
|Output: ${output.mkString(", ")}
""".stripMargin)
- // replace DataSourceV2Relation with DataSourceV2ScanRelation for the
row operation table
- // there may be multiple read relations for UPDATEs that are rewritten
as UNION
- rd transform {
+ rd transformDown {
+ // simplify the join condition in MERGE operations by discarding
already evaluated filters
+ case j @ Join(
+ PhysicalOperation(_, _, r: DataSourceV2Relation), _, _,
Some(cond), _)
+ if rd.operation.command == MERGE && evaluatedFilters.nonEmpty &&
r.table.eq(table) =>
+ j.copy(condition = Some(optimizeMergeJoinCondition(cond,
evaluatedFilters)))
+
+ // replace DataSourceV2Relation with DataSourceV2ScanRelation for the
row operation table
+ // there may be multiple read relations for UPDATEs that are rewritten
as UNION
case r: DataSourceV2Relation if r.table eq table =>
DataSourceV2ScanRelation(r, scan,
PushDownUtils.toOutputAttrs(scan.readSchema(), r))
}
}
+ // pushes down the operation condition and returns the following information:
+ // - pushed down filters
+ // - filter expressions that are fully evaluated on the data source side
+ // (such filters can be discarded and don't have to be evaluated again on
the Spark side)
+ // - post-scan filter expressions that must be evaluated on the Spark side
+ // (such filters can overlap with pushed down filters, e.g. Parquet row
group filtering)
private def pushFilters(
cond: Expression,
tableAttrs: Seq[AttributeReference],
- scanBuilder: ScanBuilder): (Either[Seq[Filter], Seq[V2Filter]],
Seq[Expression]) = {
+ scanBuilder: ScanBuilder)
+ : (Either[Seq[Filter], Seq[V2Filter]], Seq[Expression], Seq[Expression]) = {
Review Comment:
I wonder whether a type alias would make the return value more readable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]