dongjoon-hyun commented on code in PR #55518:
URL: https://github.com/apache/spark/pull/55518#discussion_r3192418867
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -106,38 +104,92 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
val remainingRowsPlan = addOperationColumn(COPY_OPERATION,
Filter(remainingRowFilter, readRelation))
- // the new state is a union of updated and copied over records
- val query = Union(updatedRowsPlan, remainingRowsPlan)
+ val updatedAndRemainingRowsPlan = Union(updatedRowsPlan, remainingRowsPlan)
- // build a plan to replace read groups in the table
val writeRelation = relation.copy(table = operationTable)
- val projections = buildReplaceDataProjections(query, relation.output,
metadataAttrs)
+ val query = updatedAndRemainingRowsPlan
+ val metadataAttrs = resolveRequiredMetadataAttrs(relation,
operationTable.operation)
+ val projections = buildReplaceDataProjections(query, rowAttrs,
metadataAttrs)
val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
ReplaceData(writeRelation, cond, query, relation, projections,
groupFilterCond)
}
+ // Common read-relation setup shared by both CoW plan builders.
+ //
+ // When the connector supports column updates and declares required data
attributes,
+ // the read relation is narrowed at analysis time so that
+ // GroupBasedRowLevelOperationScanPlanning uses only the needed columns for
the scan.
+ // Otherwise the full relation output is used.
+ private def buildCoWReadSetup(
+ relation: DataSourceV2Relation,
+ operationTable: RowLevelOperationTable,
+ assignments: Seq[Assignment],
+ cond: Expression): (DataSourceV2Relation, Seq[Attribute]) = {
+
+ val operation = operationTable.operation
+ val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
+ val connectorDataAttrs = resolveRequiredDataAttrs(relation, operation)
+ val isNarrow = operation.supportsColumnUpdates() &&
connectorDataAttrs.nonEmpty
+
+ // CoW scan narrowing must be done manually at analysis time.
+ // GroupBasedRowLevelOperationScanPlanning (an optimizer rule that fires
after analysis)
+ // always reads relation.output directly when building the physical scan
-- it does not
+ // observe Project nodes above the relation, so optimizer-driven column
pruning has no
+ // effect on CoW scans. We narrow DataSourceV2Relation.output here so
that rule picks
+ // up the narrow set.
+ val readRelation = if (isNarrow) {
+ val allRequired = (connectorDataAttrs ++
computeAssignedAttrs(assignments)).distinct
+ buildRelationWithAttrs(relation, operationTable, metadataAttrs,
dataAttrs = allRequired,
+ cond = cond)
+ } else {
+ buildRelationWithAttrs(relation, operationTable, metadataAttrs)
+ }
+
+ // CoW write schema (two paths only, no heuristic for CoW):
+ // - Narrow path (connectorDataAttrs declared): exactly connector-declared
cols in declared
+ // order. The connector must declare ALL columns it wants to receive.
+ // - Full path (connectorDataAttrs empty OR supportsColumnUpdates=false):
full table output.
+ // Unlike MOR, CoW does not have a heuristic assigned-only path because
+ // GroupBasedRowLevelOperationScanPlanning needs explicit column
declarations to narrow.
+ val rowAttrs: Seq[Attribute] = if (isNarrow) connectorDataAttrs else
relation.output
+
+ (readRelation, rowAttrs)
+ }
+
// this method assumes the assignments have been already aligned before
+ //
+ // Works for both the full-scan and narrow-scan CoW paths. In the narrow
case,
+ // readRelation.output is already restricted by buildCoWReadSetup, so
projecting
+ // all plan.output gives the correct narrow write schema.
Review Comment:
Use function description style.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]