dongjoon-hyun commented on code in PR #55518:
URL: https://github.com/apache/spark/pull/55518#discussion_r3192416031


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -106,38 +104,92 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     val remainingRowsPlan = addOperationColumn(COPY_OPERATION,
       Filter(remainingRowFilter, readRelation))
 
-    // the new state is a union of updated and copied over records
-    val query = Union(updatedRowsPlan, remainingRowsPlan)
+    val updatedAndRemainingRowsPlan = Union(updatedRowsPlan, remainingRowsPlan)
 
-    // build a plan to replace read groups in the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildReplaceDataProjections(query, relation.output, 
metadataAttrs)
+    val query = updatedAndRemainingRowsPlan
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+    val projections = buildReplaceDataProjections(query, rowAttrs, 
metadataAttrs)
     val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
     ReplaceData(writeRelation, cond, query, relation, projections, 
groupFilterCond)
   }
 
+  // Common read-relation setup shared by both CoW plan builders.
+  //
+  // When the connector supports column updates and declares required data 
attributes,
+  // the read relation is narrowed at analysis time so that
+  // GroupBasedRowLevelOperationScanPlanning uses only the needed columns for 
the scan.
+  // Otherwise the full relation output is used.
+  private def buildCoWReadSetup(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): (DataSourceV2Relation, Seq[Attribute]) = {
+
+    val operation = operationTable.operation
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
+    val connectorDataAttrs = resolveRequiredDataAttrs(relation, operation)
+    val isNarrow = operation.supportsColumnUpdates() && 
connectorDataAttrs.nonEmpty
+
+    // CoW scan narrowing must be done manually at analysis time.
+    // GroupBasedRowLevelOperationScanPlanning (an optimizer rule that fires 
after analysis)
+    // always reads relation.output directly when building the physical scan 
-- it does not
+    // observe Project nodes above the relation, so optimizer-driven column 
pruning has no
+    // effect on CoW scans.  We narrow DataSourceV2Relation.output here so 
that rule picks
+    // up the narrow set.
+    val readRelation = if (isNarrow) {
+      val allRequired = (connectorDataAttrs ++ 
computeAssignedAttrs(assignments)).distinct
+      buildRelationWithAttrs(relation, operationTable, metadataAttrs, 
dataAttrs = allRequired,
+        cond = cond)
+    } else {
+      buildRelationWithAttrs(relation, operationTable, metadataAttrs)
+    }
+
+    // CoW write schema (two paths only, no heuristic for CoW):
+    // - Narrow path (connectorDataAttrs declared): exactly connector-declared 
cols in declared
+    //   order.  The connector must declare ALL columns it wants to receive.
+    // - Full path (connectorDataAttrs empty OR supportsColumnUpdates=false): 
full table output.
+    //   Unlike MOR, CoW does not have a heuristic assigned-only path because
+    //   GroupBasedRowLevelOperationScanPlanning needs explicit column 
declarations to narrow.
+    val rowAttrs: Seq[Attribute] = if (isNarrow) connectorDataAttrs else 
relation.output
+
+    (readRelation, rowAttrs)

Review Comment:
   Please return `metadataAttrs` too to avoid recompilation in the caller-side.
   > val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to