aokolnychyi commented on code in PR #41577:
URL: https://github.com/apache/spark/pull/41577#discussion_r1228460068
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -123,14 +123,121 @@ object RewriteMergeIntoTable extends
RewriteRowLevelCommand with PredicateHelper
r, table, source, cond, matchedActions,
notMatchedActions, notMatchedBySourceActions)
case _ =>
- throw new AnalysisException("Group-based MERGE commands are not
supported yet")
+ buildReplaceDataPlan(
+ r, table, source, cond, matchedActions,
+ notMatchedActions, notMatchedBySourceActions)
}
case _ =>
m
}
}
+ // build a rewrite plan for sources that support replacing groups of data
(e.g. files, partitions)
+ private def buildReplaceDataPlan(
+ relation: DataSourceV2Relation,
+ operationTable: RowLevelOperationTable,
+ source: LogicalPlan,
+ cond: Expression,
+ matchedActions: Seq[MergeAction],
+ notMatchedActions: Seq[MergeAction],
+ notMatchedBySourceActions: Seq[MergeAction]): ReplaceData = {
+
+ // resolve all required metadata attrs that may be used for grouping data
on write
+ // for instance, JDBC data source may cluster data by shard/host before
writing
+ val metadataAttrs = resolveRequiredMetadataAttrs(relation,
operationTable.operation)
+
+ // construct a read relation and include all required metadata columns
+ val readRelation = buildRelationWithAttrs(relation, operationTable,
metadataAttrs)
+
+ val checkCardinality = shouldCheckCardinality(matchedActions)
+
+ // use left outer join if there is no NOT MATCHED action, unmatched source
rows can be discarded
+ // use full outer join in all other cases, unmatched source rows may be
needed
+ val joinType = if (notMatchedActions.isEmpty) LeftOuter else FullOuter
+ val joinPlan = join(readRelation, source, joinType, cond, checkCardinality)
+
+ val mergeRowsPlan = buildReplaceDataMergeRowsPlan(
+ readRelation, joinPlan, matchedActions, notMatchedActions,
+ notMatchedBySourceActions, metadataAttrs, checkCardinality)
+
+ // predicates of the ON condition can be used to filter the target table
(planning & runtime)
+ // only if there is no NOT MATCHED BY SOURCE clause
+ val (pushableCond, groupFilterCond) = if
(notMatchedBySourceActions.isEmpty) {
+ (cond, Some(toGroupFilterCondition(relation, source, cond)))
+ } else {
+ (TrueLiteral, None)
+ }
+
+ // build a plan to replace read groups in the table
+ val writeRelation = relation.copy(table = operationTable)
+ ReplaceData(writeRelation, pushableCond, mergeRowsPlan, relation,
groupFilterCond)
+ }
+
+ private def buildReplaceDataMergeRowsPlan(
+ targetTable: LogicalPlan,
+ joinPlan: LogicalPlan,
+ matchedActions: Seq[MergeAction],
+ notMatchedActions: Seq[MergeAction],
+ notMatchedBySourceActions: Seq[MergeAction],
+ metadataAttrs: Seq[Attribute],
+ checkCardinality: Boolean): MergeRows = {
+
+ // target records that were read but did not match any MATCHED or NOT
MATCHED BY SOURCE actions
+ // must be copied over and included in the new state of the table as
groups are being replaced
+ // that's why an extra unconditional instruction that would produce the
original row is added
+ // as the last MATCHED and NOT MATCHED BY SOURCE instruction
+ // this logic is specific to data sources that replace groups of data
+ val keepCarryoverRowsInstruction = Keep(TrueLiteral, targetTable.output)
+
+ val matchedInstructions = matchedActions.map { action =>
+ toInstruction(action, metadataAttrs)
+ } :+ keepCarryoverRowsInstruction
+
+ val notMatchedInstructions = notMatchedActions.map { action =>
+ toInstruction(action, metadataAttrs)
+ }
+
+ val notMatchedBySourceInstructions = notMatchedBySourceActions.map {
action =>
+ toInstruction(action, metadataAttrs)
+ } :+ keepCarryoverRowsInstruction
+
+ val rowFromSourceAttr = resolveAttrRef(ROW_FROM_SOURCE, joinPlan)
+ val rowFromTargetAttr = resolveAttrRef(ROW_FROM_TARGET, joinPlan)
+
+ val outputs = matchedInstructions.flatMap(_.outputs) ++
+ notMatchedInstructions.flatMap(_.outputs) ++
+ notMatchedBySourceInstructions.flatMap(_.outputs)
+
+ val attrs = targetTable.output
+
+ MergeRows(
+ isSourceRowPresent = IsNotNull(rowFromSourceAttr),
+ isTargetRowPresent = IsNotNull(rowFromTargetAttr),
+ matchedInstructions = matchedInstructions,
+ notMatchedInstructions = notMatchedInstructions,
+ notMatchedBySourceInstructions = notMatchedBySourceInstructions,
+ checkCardinality = checkCardinality,
+ output = generateExpandOutput(attrs, outputs),
+ joinPlan)
+ }
+
+ // converts a MERGE condition into an EXISTS subquery for runtime filtering
+ private def toGroupFilterCondition(
Review Comment:
MERGE conditions, unlike DELETE conditions, look like `t.id = s.id` and
references columns from both sides. We cannot use such conditions for runtime
filtering without extra work. That's why we transform MERGE conditions into
executable subqueries here. This step cannot be done in the rule for runtime
filtering because it does not have access to the source relation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]