anuragmantri commented on code in PR #55518:
URL: https://github.com/apache/spark/pull/55518#discussion_r3196568267


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -105,4 +105,45 @@ default String description() {
   default NamedReference[] requiredMetadataAttributes() {
     return new NamedReference[0];
   }
+
+
+  /**
+   * Controls whether to send only the required data columns to the connector 
rather than the
+   * full row.
+   * <p>
+   * When true, Spark narrows the data column schema ({@link 
LogicalWriteInfo#schema()}) to only
+   * the columns declared via {@link #requiredDataAttributes()}. Metadata 
columns (from
+   * {@link #requiredMetadataAttributes()}) and row ID columns (from
+   * {@link SupportsDelta#rowId()}) are unaffected and always projected 
separately.
+   * <p>
+   * If {@link #requiredDataAttributes()} returns a non-empty array, the write 
schema is exactly
+   * those columns in declared order. The connector must include all columns 
it wants to receive,
+   * including the columns being updated. If {@link #requiredDataAttributes()} 
returns an empty
+   * array, Spark sends only the non-identity assigned columns (heuristic 
path).
+   *
+   * @since 4.2.0

Review Comment:
   Done



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala:
##########
@@ -50,20 +51,34 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
   protected def buildOperationTable(
       table: SupportsRowLevelOperations,
       command: Command,
-      options: CaseInsensitiveStringMap): RowLevelOperationTable = {
-    val info = RowLevelOperationInfoImpl(command, options)
+      options: CaseInsensitiveStringMap,
+      updatedColumns: Seq[NamedReference] = Nil): RowLevelOperationTable = {
+    val info = RowLevelOperationInfoImpl(command, options, updatedColumns)
     val operation = table.newRowLevelOperationBuilder(info).build()
     RowLevelOperationTable(table, operation)
   }
 
+  // Builds a DataSourceV2Relation for a row-level operation, optionally 
narrowing its output.
+  //
+  // When dataAttrs is non-empty, the relation output is narrowed to include 
only columns
+  // required for a column-update write. When dataAttrs is empty, the full 
relation.output is
+  // preserved.

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -65,18 +72,15 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
       assignments: Seq[Assignment],
       cond: Expression): ReplaceData = {
 
-    // resolve all required metadata attrs that may be used for grouping data 
on write
-    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
-
-    // construct a read relation and include all required metadata columns
-    val readRelation = buildRelationWithAttrs(relation, operationTable, 
metadataAttrs)
+    val (readRelation, rowAttrs) = buildCoWReadSetup(relation, operationTable, 
assignments, cond)
 
-    // build a plan with updated and copied over records
-    val query = buildReplaceDataUpdateProjection(readRelation, assignments, 
cond)
+    val updatedAndRemainingRowsPlan = buildReplaceDataUpdateProjection(
+      readRelation, assignments, cond)
 
-    // build a plan to replace read groups in the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildReplaceDataProjections(query, relation.output, 
metadataAttrs)
+    val query = updatedAndRemainingRowsPlan

Review Comment:
   Done, used a single variable



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -105,4 +105,45 @@ default String description() {
   default NamedReference[] requiredMetadataAttributes() {
     return new NamedReference[0];
   }
+
+

Review Comment:
   Done



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -105,4 +105,45 @@ default String description() {
   default NamedReference[] requiredMetadataAttributes() {
     return new NamedReference[0];
   }
+
+
+  /**
+   * Controls whether to send only the required data columns to the connector 
rather than the
+   * full row.
+   * <p>
+   * When true, Spark narrows the data column schema ({@link 
LogicalWriteInfo#schema()}) to only
+   * the columns declared via {@link #requiredDataAttributes()}. Metadata 
columns (from
+   * {@link #requiredMetadataAttributes()}) and row ID columns (from
+   * {@link SupportsDelta#rowId()}) are unaffected and always projected 
separately.
+   * <p>
+   * If {@link #requiredDataAttributes()} returns a non-empty array, the write 
schema is exactly
+   * those columns in declared order. The connector must include all columns 
it wants to receive,
+   * including the columns being updated. If {@link #requiredDataAttributes()} 
returns an empty
+   * array, Spark sends only the non-identity assigned columns (heuristic 
path).
+   *
+   * @since 4.2.0
+   */
+  default boolean supportsColumnUpdates() {
+    return false;
+  }
+
+  /**
+   * Returns data column references required to perform this row-level 
operation.
+   * <p>
+   * This method is only consulted by Spark when {@link 
#supportsColumnUpdates()} returns
+   * {@code true}. If {@code supportsColumnUpdates()} returns {@code false}, 
the returned array
+   * is ignored and the full table row is sent (the default behavior).
+   * <p>
+   * When non-empty, the returned columns become the write schema in declared 
order.
+   * The connector must declare all columns it wants to receive, including the 
columns being
+   * updated. Use {@link RowLevelOperationInfo#updatedColumns()} to learn 
which columns are being
+   * assigned, then add any extra columns needed for row lookup or routing 
(e.g., primary key).
+   * <p>
+   * When empty (the default), Spark falls back to sending only the 
non-identity assigned columns.
+   *
+   * @since 4.2.0
+   */
+  default NamedReference[] requiredDataAttributes() {

Review Comment:
   Even though the scope of this PR is UPDATE only, we'd like this API to work 
for MERGE as well (DELETE doesn't benefit since it doesn't write data columns). 
I'm still assessing what it takes and will add a section in the SPIP on how it 
could be implemented.
    
   Happy to add a "currently only consulted for UPDATE" note in the Javadoc for 
now and remove it when MERGE support lands. 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -41,7 +42,13 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
 
       EliminateSubqueryAliases(aliasedTable) match {
         case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) =>
-          val table = buildOperationTable(tbl, UPDATE, 
CaseInsensitiveStringMap.empty())
+          val updatedCols = assignments.collect {
+            case Assignment(key: AttributeReference, value)
+                if !isIdentityAssignment(key, value) =>

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -106,38 +104,92 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     val remainingRowsPlan = addOperationColumn(COPY_OPERATION,
       Filter(remainingRowFilter, readRelation))
 
-    // the new state is a union of updated and copied over records
-    val query = Union(updatedRowsPlan, remainingRowsPlan)
+    val updatedAndRemainingRowsPlan = Union(updatedRowsPlan, remainingRowsPlan)
 
-    // build a plan to replace read groups in the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildReplaceDataProjections(query, relation.output, 
metadataAttrs)
+    val query = updatedAndRemainingRowsPlan
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+    val projections = buildReplaceDataProjections(query, rowAttrs, 
metadataAttrs)
     val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
     ReplaceData(writeRelation, cond, query, relation, projections, 
groupFilterCond)
   }
 
+  // Common read-relation setup shared by both CoW plan builders.
+  //
+  // When the connector supports column updates and declares required data 
attributes,
+  // the read relation is narrowed at analysis time so that
+  // GroupBasedRowLevelOperationScanPlanning uses only the needed columns for 
the scan.
+  // Otherwise the full relation output is used.
+  private def buildCoWReadSetup(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): (DataSourceV2Relation, Seq[Attribute]) = {
+
+    val operation = operationTable.operation
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
+    val connectorDataAttrs = resolveRequiredDataAttrs(relation, operation)
+    val isNarrow = operation.supportsColumnUpdates() && 
connectorDataAttrs.nonEmpty
+
+    // CoW scan narrowing must be done manually at analysis time.
+    // GroupBasedRowLevelOperationScanPlanning (an optimizer rule that fires 
after analysis)
+    // always reads relation.output directly when building the physical scan 
-- it does not
+    // observe Project nodes above the relation, so optimizer-driven column 
pruning has no
+    // effect on CoW scans.  We narrow DataSourceV2Relation.output here so 
that rule picks
+    // up the narrow set.
+    val readRelation = if (isNarrow) {
+      val allRequired = (connectorDataAttrs ++ 
computeAssignedAttrs(assignments)).distinct
+      buildRelationWithAttrs(relation, operationTable, metadataAttrs, 
dataAttrs = allRequired,
+        cond = cond)
+    } else {
+      buildRelationWithAttrs(relation, operationTable, metadataAttrs)
+    }
+
+    // CoW write schema (two paths only, no heuristic for CoW):
+    // - Narrow path (connectorDataAttrs declared): exactly connector-declared 
cols in declared
+    //   order.  The connector must declare ALL columns it wants to receive.
+    // - Full path (connectorDataAttrs empty OR supportsColumnUpdates=false): 
full table output.
+    //   Unlike MOR, CoW does not have a heuristic assigned-only path because
+    //   GroupBasedRowLevelOperationScanPlanning needs explicit column 
declarations to narrow.
+    val rowAttrs: Seq[Attribute] = if (isNarrow) connectorDataAttrs else 
relation.output
+
+    (readRelation, rowAttrs)

Review Comment:
   I changed this to return metadataAttrs too. 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -106,38 +104,92 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     val remainingRowsPlan = addOperationColumn(COPY_OPERATION,
       Filter(remainingRowFilter, readRelation))
 
-    // the new state is a union of updated and copied over records
-    val query = Union(updatedRowsPlan, remainingRowsPlan)
+    val updatedAndRemainingRowsPlan = Union(updatedRowsPlan, remainingRowsPlan)
 
-    // build a plan to replace read groups in the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildReplaceDataProjections(query, relation.output, 
metadataAttrs)
+    val query = updatedAndRemainingRowsPlan
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+    val projections = buildReplaceDataProjections(query, rowAttrs, 
metadataAttrs)
     val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
     ReplaceData(writeRelation, cond, query, relation, projections, 
groupFilterCond)
   }
 
+  // Common read-relation setup shared by both CoW plan builders.
+  //
+  // When the connector supports column updates and declares required data 
attributes,
+  // the read relation is narrowed at analysis time so that
+  // GroupBasedRowLevelOperationScanPlanning uses only the needed columns for 
the scan.
+  // Otherwise the full relation output is used.

Review Comment:
   Done



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -106,38 +104,92 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     val remainingRowsPlan = addOperationColumn(COPY_OPERATION,
       Filter(remainingRowFilter, readRelation))
 
-    // the new state is a union of updated and copied over records
-    val query = Union(updatedRowsPlan, remainingRowsPlan)
+    val updatedAndRemainingRowsPlan = Union(updatedRowsPlan, remainingRowsPlan)
 
-    // build a plan to replace read groups in the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildReplaceDataProjections(query, relation.output, 
metadataAttrs)
+    val query = updatedAndRemainingRowsPlan
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+    val projections = buildReplaceDataProjections(query, rowAttrs, 
metadataAttrs)
     val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
     ReplaceData(writeRelation, cond, query, relation, projections, 
groupFilterCond)
   }
 
+  // Common read-relation setup shared by both CoW plan builders.
+  //
+  // When the connector supports column updates and declares required data 
attributes,
+  // the read relation is narrowed at analysis time so that
+  // GroupBasedRowLevelOperationScanPlanning uses only the needed columns for 
the scan.
+  // Otherwise the full relation output is used.
+  private def buildCoWReadSetup(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): (DataSourceV2Relation, Seq[Attribute]) = {
+
+    val operation = operationTable.operation
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
+    val connectorDataAttrs = resolveRequiredDataAttrs(relation, operation)
+    val isNarrow = operation.supportsColumnUpdates() && 
connectorDataAttrs.nonEmpty
+
+    // CoW scan narrowing must be done manually at analysis time.
+    // GroupBasedRowLevelOperationScanPlanning (an optimizer rule that fires 
after analysis)
+    // always reads relation.output directly when building the physical scan 
-- it does not
+    // observe Project nodes above the relation, so optimizer-driven column 
pruning has no
+    // effect on CoW scans.  We narrow DataSourceV2Relation.output here so 
that rule picks
+    // up the narrow set.
+    val readRelation = if (isNarrow) {
+      val allRequired = (connectorDataAttrs ++ 
computeAssignedAttrs(assignments)).distinct
+      buildRelationWithAttrs(relation, operationTable, metadataAttrs, 
dataAttrs = allRequired,
+        cond = cond)
+    } else {
+      buildRelationWithAttrs(relation, operationTable, metadataAttrs)
+    }
+
+    // CoW write schema (two paths only, no heuristic for CoW):
+    // - Narrow path (connectorDataAttrs declared): exactly connector-declared 
cols in declared
+    //   order.  The connector must declare ALL columns it wants to receive.
+    // - Full path (connectorDataAttrs empty OR supportsColumnUpdates=false): 
full table output.
+    //   Unlike MOR, CoW does not have a heuristic assigned-only path because
+    //   GroupBasedRowLevelOperationScanPlanning needs explicit column 
declarations to narrow.
+    val rowAttrs: Seq[Attribute] = if (isNarrow) connectorDataAttrs else 
relation.output
+
+    (readRelation, rowAttrs)
+  }
+
   // this method assumes the assignments have been already aligned before
+  //
+  // Works for both the full-scan and narrow-scan CoW paths.  In the narrow 
case,
+  // readRelation.output is already restricted by buildCoWReadSetup, so 
projecting
+  // all plan.output gives the correct narrow write schema.

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -154,30 +206,145 @@ object RewriteUpdateTable extends RewriteRowLevelCommand 
{
       cond: Expression): WriteDelta = {
 
     val operation = operationTable.operation.asInstanceOf[SupportsDelta]
+    // Column-update support applies to the standard delta path and the 
delete+reinsert path.
+    // When representUpdateAsDeleteAndInsert is true, the REINSERT leg of the 
Expand already
+    // uses only assigned values, so the narrow effectiveRowAttrs applies 
correctly.
+    val supportsColumnUpdate = operation.supportsColumnUpdates()
 
     // resolve all needed attrs (e.g. row ID and any required metadata attrs)
-    val rowAttrs = relation.output
     val rowIdAttrs = resolveRowIdAttrs(relation, operation)
     val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
 
-    // construct a read relation and include all required metadata columns
+    // Connector-declared data attrs used to determine pass-through columns in 
the write plan.
+    val connectorDataAttrs = if (supportsColumnUpdate) {
+      resolveRequiredDataAttrs(relation, operation)
+    } else Nil
+
+    // MOR uses a full-schema scan; ColumnPruning narrows it via Project 
references.
     val readRelation = buildRelationWithAttrs(relation, operationTable, 
metadataAttrs, rowIdAttrs)
 
+    // Connector-required attrs that are NOT being assigned are added as 
pass-throughs in the
+    // plan so that ColumnPruning keeps them in the physical scan AND the 
connector receives
+    // their current values via DeltaWriter.update's row argument.
+    val assignedAttrs = if (supportsColumnUpdate) 
computeAssignedAttrs(assignments)
+                        else relation.output
+    val connectorExtraAttrs: Seq[AttributeReference] = if 
(connectorDataAttrs.nonEmpty) {
+      val assignedAttrSet = AttributeSet(assignedAttrs)
+      connectorDataAttrs.filterNot(assignedAttrSet.contains)
+    } else Nil
+
     // build a plan for updated records that match the condition
     val matchedRowsPlan = Filter(cond, readRelation)
     val rowDeltaPlan = if (operation.representUpdateAsDeleteAndInsert) {
       buildDeletesAndInserts(matchedRowsPlan, assignments, rowIdAttrs)
+    } else if (supportsColumnUpdate) {
+      buildColumnUpdateProjection(
+        matchedRowsPlan, assignments, rowIdAttrs, metadataAttrs, 
connectorExtraAttrs)
     } else {
       buildWriteDeltaUpdateProjection(matchedRowsPlan, assignments, rowIdAttrs)
     }
 
+    // Effective row write schema:
+    // - Narrow path (connectorDataAttrs declared): exactly connector-declared 
cols in declared
+    //   order.  The connector must declare ALL columns it wants to receive 
(including updated
+    //   ones).  This mirrors the metadata pattern and enables strict 
areCompatible validation.
+    // - Heuristic path (connectorDataAttrs empty): only the assigned 
(changed) columns.
+    // - Full path (no column-update support): full table output.
+    val effectiveRowAttrs = if (supportsColumnUpdate && 
connectorDataAttrs.nonEmpty) {
+      connectorDataAttrs
+    } else if (supportsColumnUpdate) {
+      assignedAttrs
+    } else {
+      relation.output
+    }
+
     // build a plan to write the row delta to the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildWriteDeltaProjections(rowDeltaPlan, rowAttrs, 
rowIdAttrs, metadataAttrs)
+    val projections = buildWriteDeltaProjections(
+      rowDeltaPlan, effectiveRowAttrs, rowIdAttrs, metadataAttrs)
     val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
     WriteDelta(writeRelation, cond, rowDeltaPlan, relation, projections, 
groupFilterCond)
   }
 
+  // Builds the row delta projection for the column update path.
+  //
+  // The resulting Project references only:
+  //   - assigned column values (new values being written)
+  //   - connector pass-through values (connector declared but not assigned)
+  //   - metadata columns (nulled or preserved)
+  //   - row ID columns (for delta identification)
+  //   - original row ID values (only when a row ID column is being reassigned)
+  //
+  // ColumnPruning observes exactly these references and narrows the physical 
scan accordingly.
+  // Connectors that need additional columns in the scan (e.g., partition 
columns for
+  // distribution) should declare them in requiredDataAttributes().
+  //
+  // Note: AlignUpdateAssignments guarantees all assignment keys are top-level

Review Comment:
   I added a new test `test("column-update: nested struct field update narrows 
to the root struct column")` that updates an inner field in a struct, the 
`AlignUpdateAssignment` returns only the root key. 



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperationInfo.java:
##########
@@ -37,4 +38,20 @@ public interface RowLevelOperationInfo {
    * Returns the row-level SQL command (e.g. DELETE, UPDATE, MERGE).
    */
   Command command();
+
+  /**
+   * Returns the columns being updated in an UPDATE statement, as non-identity 
assignments.
+   *
+   * <p>For DELETE and MERGE, returns an empty array.
+   *
+   * <p>Connectors can use this to decide what {@link 
RowLevelOperation#requiredDataAttributes()}
+   * to declare. For instance, a connector that needs its primary key for row 
lookup can check
+   * whether pk is already in the updated columns list and, if not, add it to
+   * requiredDataAttributes().
+   *
+   * @since 4.2.0

Review Comment:
   Done



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -470,7 +506,28 @@ case class WriteDelta(
       case Some(projection) => DataTypeUtils.toAttributes(projection.schema)
       case None => Nil
     }
-    table.skipSchemaResolution || areCompatible(inRowAttrs, outRowAttrs)
+    table.skipSchemaResolution ||
+      areCompatible(inRowAttrs, outRowAttrs) ||
+      dataAttrsResolved(inRowAttrs)

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -154,30 +206,145 @@ object RewriteUpdateTable extends RewriteRowLevelCommand 
{
       cond: Expression): WriteDelta = {
 
     val operation = operationTable.operation.asInstanceOf[SupportsDelta]
+    // Column-update support applies to the standard delta path and the 
delete+reinsert path.
+    // When representUpdateAsDeleteAndInsert is true, the REINSERT leg of the 
Expand already
+    // uses only assigned values, so the narrow effectiveRowAttrs applies 
correctly.
+    val supportsColumnUpdate = operation.supportsColumnUpdates()
 
     // resolve all needed attrs (e.g. row ID and any required metadata attrs)
-    val rowAttrs = relation.output
     val rowIdAttrs = resolveRowIdAttrs(relation, operation)
     val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
 
-    // construct a read relation and include all required metadata columns
+    // Connector-declared data attrs used to determine pass-through columns in 
the write plan.
+    val connectorDataAttrs = if (supportsColumnUpdate) {
+      resolveRequiredDataAttrs(relation, operation)
+    } else Nil
+
+    // MOR uses a full-schema scan; ColumnPruning narrows it via Project 
references.
     val readRelation = buildRelationWithAttrs(relation, operationTable, 
metadataAttrs, rowIdAttrs)
 
+    // Connector-required attrs that are NOT being assigned are added as 
pass-throughs in the
+    // plan so that ColumnPruning keeps them in the physical scan AND the 
connector receives
+    // their current values via DeltaWriter.update's row argument.
+    val assignedAttrs = if (supportsColumnUpdate) 
computeAssignedAttrs(assignments)
+                        else relation.output
+    val connectorExtraAttrs: Seq[AttributeReference] = if 
(connectorDataAttrs.nonEmpty) {
+      val assignedAttrSet = AttributeSet(assignedAttrs)
+      connectorDataAttrs.filterNot(assignedAttrSet.contains)
+    } else Nil
+
     // build a plan for updated records that match the condition
     val matchedRowsPlan = Filter(cond, readRelation)
     val rowDeltaPlan = if (operation.representUpdateAsDeleteAndInsert) {
       buildDeletesAndInserts(matchedRowsPlan, assignments, rowIdAttrs)
+    } else if (supportsColumnUpdate) {
+      buildColumnUpdateProjection(
+        matchedRowsPlan, assignments, rowIdAttrs, metadataAttrs, 
connectorExtraAttrs)
     } else {
       buildWriteDeltaUpdateProjection(matchedRowsPlan, assignments, rowIdAttrs)
     }
 
+    // Effective row write schema:
+    // - Narrow path (connectorDataAttrs declared): exactly connector-declared 
cols in declared
+    //   order.  The connector must declare ALL columns it wants to receive 
(including updated
+    //   ones).  This mirrors the metadata pattern and enables strict 
areCompatible validation.
+    // - Heuristic path (connectorDataAttrs empty): only the assigned 
(changed) columns.
+    // - Full path (no column-update support): full table output.
+    val effectiveRowAttrs = if (supportsColumnUpdate && 
connectorDataAttrs.nonEmpty) {
+      connectorDataAttrs
+    } else if (supportsColumnUpdate) {
+      assignedAttrs
+    } else {
+      relation.output
+    }
+
     // build a plan to write the row delta to the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildWriteDeltaProjections(rowDeltaPlan, rowAttrs, 
rowIdAttrs, metadataAttrs)
+    val projections = buildWriteDeltaProjections(
+      rowDeltaPlan, effectiveRowAttrs, rowIdAttrs, metadataAttrs)
     val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
     WriteDelta(writeRelation, cond, rowDeltaPlan, relation, projections, 
groupFilterCond)
   }
 
+  // Builds the row delta projection for the column update path.

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -378,7 +386,35 @@ case class ReplaceData(
   // validates row projection output is compatible with table attributes
   private def rowAttrsResolved: Boolean = {
     val inRowAttrs = 
DataTypeUtils.toAttributes(projections.rowProjection.schema)
-    table.skipSchemaResolution || areCompatible(inRowAttrs, table.output)
+    table.skipSchemaResolution ||
+      areCompatible(inRowAttrs, table.output) ||
+      dataAttrsResolved(inRowAttrs)
+  }
+
+  // Validates the narrow-write-schema row projection output.

Review Comment:
   Done.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -105,4 +105,45 @@ default String description() {
   default NamedReference[] requiredMetadataAttributes() {
     return new NamedReference[0];
   }
+
+
+  /**
+   * Controls whether to send only the required data columns to the connector 
rather than the
+   * full row.
+   * <p>
+   * When true, Spark narrows the data column schema ({@link 
LogicalWriteInfo#schema()}) to only
+   * the columns declared via {@link #requiredDataAttributes()}. Metadata 
columns (from
+   * {@link #requiredMetadataAttributes()}) and row ID columns (from
+   * {@link SupportsDelta#rowId()}) are unaffected and always projected 
separately.
+   * <p>
+   * If {@link #requiredDataAttributes()} returns a non-empty array, the write 
schema is exactly
+   * those columns in declared order. The connector must include all columns 
it wants to receive,
+   * including the columns being updated. If {@link #requiredDataAttributes()} 
returns an empty
+   * array, Spark sends only the non-identity assigned columns (heuristic 
path).
+   *
+   * @since 4.2.0
+   */
+  default boolean supportsColumnUpdates() {
+    return false;
+  }
+
+  /**
+   * Returns data column references required to perform this row-level 
operation.
+   * <p>
+   * This method is only consulted by Spark when {@link 
#supportsColumnUpdates()} returns
+   * {@code true}. If {@code supportsColumnUpdates()} returns {@code false}, 
the returned array
+   * is ignored and the full table row is sent (the default behavior).
+   * <p>
+   * When non-empty, the returned columns become the write schema in declared 
order.
+   * The connector must declare all columns it wants to receive, including the 
columns being

Review Comment:
   Each column the connector returns passes through 
`V2ExpressionUtils.resolveRefs` which throws `AnalysisException` if the column 
is non existent. 
   
   I added a test `test("column-update: requiredDataAttributes throws 
AnalysisException for invalid column")`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -154,30 +206,145 @@ object RewriteUpdateTable extends RewriteRowLevelCommand 
{
       cond: Expression): WriteDelta = {
 
     val operation = operationTable.operation.asInstanceOf[SupportsDelta]
+    // Column-update support applies to the standard delta path and the 
delete+reinsert path.
+    // When representUpdateAsDeleteAndInsert is true, the REINSERT leg of the 
Expand already
+    // uses only assigned values, so the narrow effectiveRowAttrs applies 
correctly.
+    val supportsColumnUpdate = operation.supportsColumnUpdates()
 
     // resolve all needed attrs (e.g. row ID and any required metadata attrs)
-    val rowAttrs = relation.output
     val rowIdAttrs = resolveRowIdAttrs(relation, operation)
     val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
 
-    // construct a read relation and include all required metadata columns
+    // Connector-declared data attrs used to determine pass-through columns in 
the write plan.
+    val connectorDataAttrs = if (supportsColumnUpdate) {
+      resolveRequiredDataAttrs(relation, operation)
+    } else Nil
+
+    // MOR uses a full-schema scan; ColumnPruning narrows it via Project 
references.
     val readRelation = buildRelationWithAttrs(relation, operationTable, 
metadataAttrs, rowIdAttrs)
 
+    // Connector-required attrs that are NOT being assigned are added as 
pass-throughs in the
+    // plan so that ColumnPruning keeps them in the physical scan AND the 
connector receives
+    // their current values via DeltaWriter.update's row argument.
+    val assignedAttrs = if (supportsColumnUpdate) 
computeAssignedAttrs(assignments)
+                        else relation.output
+    val connectorExtraAttrs: Seq[AttributeReference] = if 
(connectorDataAttrs.nonEmpty) {
+      val assignedAttrSet = AttributeSet(assignedAttrs)
+      connectorDataAttrs.filterNot(assignedAttrSet.contains)
+    } else Nil
+
     // build a plan for updated records that match the condition
     val matchedRowsPlan = Filter(cond, readRelation)
     val rowDeltaPlan = if (operation.representUpdateAsDeleteAndInsert) {
       buildDeletesAndInserts(matchedRowsPlan, assignments, rowIdAttrs)
+    } else if (supportsColumnUpdate) {
+      buildColumnUpdateProjection(
+        matchedRowsPlan, assignments, rowIdAttrs, metadataAttrs, 
connectorExtraAttrs)
     } else {
       buildWriteDeltaUpdateProjection(matchedRowsPlan, assignments, rowIdAttrs)
     }
 
+    // Effective row write schema:
+    // - Narrow path (connectorDataAttrs declared): exactly connector-declared 
cols in declared
+    //   order.  The connector must declare ALL columns it wants to receive 
(including updated
+    //   ones).  This mirrors the metadata pattern and enables strict 
areCompatible validation.
+    // - Heuristic path (connectorDataAttrs empty): only the assigned 
(changed) columns.
+    // - Full path (no column-update support): full table output.
+    val effectiveRowAttrs = if (supportsColumnUpdate && 
connectorDataAttrs.nonEmpty) {
+      connectorDataAttrs
+    } else if (supportsColumnUpdate) {
+      assignedAttrs
+    } else {
+      relation.output
+    }
+
     // build a plan to write the row delta to the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildWriteDeltaProjections(rowDeltaPlan, rowAttrs, 
rowIdAttrs, metadataAttrs)
+    val projections = buildWriteDeltaProjections(
+      rowDeltaPlan, effectiveRowAttrs, rowIdAttrs, metadataAttrs)
     val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
     WriteDelta(writeRelation, cond, rowDeltaPlan, relation, projections, 
groupFilterCond)
   }
 
+  // Builds the row delta projection for the column update path.
+  //
+  // The resulting Project references only:
+  //   - assigned column values (new values being written)
+  //   - connector pass-through values (connector declared but not assigned)
+  //   - metadata columns (nulled or preserved)
+  //   - row ID columns (for delta identification)
+  //   - original row ID values (only when a row ID column is being reassigned)
+  //
+  // ColumnPruning observes exactly these references and narrows the physical 
scan accordingly.
+  // Connectors that need additional columns in the scan (e.g., partition 
columns for
+  // distribution) should declare them in requiredDataAttributes().

Review Comment:
   Each column the connector returns passes through 
V2ExpressionUtils.resolveRefs which throws AnalysisException if the column is 
non existent.
   
   I added a test test("column-update: requiredDataAttributes throws 
AnalysisException for invalid column")



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to