aokolnychyi commented on code in PR #41700:
URL: https://github.com/apache/spark/pull/41700#discussion_r1239135361


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala:
##########
@@ -56,29 +62,67 @@ object GroupBasedRowLevelOperationScanPlanning extends 
Rule[LogicalPlan] with Pr
         s"""
            |Pushing operators to ${relation.name}
            |Pushed filters: $pushedFiltersStr
-           |Filters that were not pushed: ${remainingFilters.mkString(", ")}
+           |Filters evaluated on data source side: 
${evaluatedFilters.mkString(", ")}
+           |Filters evaluated on Spark side: ${postScanFilters.mkString(", ")}
            |Output: ${output.mkString(", ")}
          """.stripMargin)
 
-      // replace DataSourceV2Relation with DataSourceV2ScanRelation for the 
row operation table
-      // there may be multiple read relations for UPDATEs that are rewritten 
as UNION
-      rd transform {
+      rd transformDown {
+        // simplify the join condition in MERGE operations by discarding 
already evaluated filters
+        case j @ Join(
+            PhysicalOperation(_, _, r: DataSourceV2Relation), _, _, 
Some(cond), _)
+            if rd.operation.command == MERGE && evaluatedFilters.nonEmpty && 
r.table.eq(table) =>
+          j.copy(condition = Some(optimizeMergeJoinCondition(cond, 
evaluatedFilters)))
+
+        // replace DataSourceV2Relation with DataSourceV2ScanRelation for the 
row operation table
+        // there may be multiple read relations for UPDATEs that are rewritten 
as UNION
         case r: DataSourceV2Relation if r.table eq table =>
           DataSourceV2ScanRelation(r, scan, 
PushDownUtils.toOutputAttrs(scan.readSchema(), r))
       }
   }
 
+  // pushes down the operation condition and returns the following information:
+  // - pushed down filters
+  // - filter expressions that are fully evaluated on the data source side
+  //   (such filters can be discarded and don't have to be evaluated again on 
the Spark side)
+  // - post-scan filter expressions that must be evaluated on the Spark side
+  //   (such filters can overlap with pushed down filters, e.g. Parquet row 
group filtering)
   private def pushFilters(
       cond: Expression,
       tableAttrs: Seq[AttributeReference],
-      scanBuilder: ScanBuilder): (Either[Seq[Filter], Seq[V2Filter]], 
Seq[Expression]) = {
+      scanBuilder: ScanBuilder)
+  : (Either[Seq[Filter], Seq[V2Filter]], Seq[Expression], Seq[Expression]) = {

Review Comment:
   I wonder whether a type alias would make the return value more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to