aokolnychyi commented on code in PR #55518:
URL: https://github.com/apache/spark/pull/55518#discussion_r3423944549


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -106,38 +105,78 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     val remainingRowsPlan = addOperationColumn(COPY_OPERATION,
       Filter(remainingRowFilter, readRelation))
 
-    // the new state is a union of updated and copied over records
-    val query = Union(updatedRowsPlan, remainingRowsPlan)
+    val updatedAndRemainingRowsPlan = Union(updatedRowsPlan, remainingRowsPlan)
 
-    // build a plan to replace read groups in the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildReplaceDataProjections(query, relation.output, 
metadataAttrs)
+    val projections = buildReplaceDataProjections(updatedAndRemainingRowsPlan, 
rowAttrs,
+      metadataAttrs)
     val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
-    ReplaceData(writeRelation, cond, query, relation, projections, 
groupFilterCond)
+    ReplaceData(writeRelation, cond, updatedAndRemainingRowsPlan, relation, 
projections,
+      groupFilterCond)
   }
 
-  // this method assumes the assignments have been already aligned before
+  /**
+   * When the connector supports column updates and declares required data 
attributes,
+   * the read relation is narrowed at analysis time so that 
GroupBasedRowLevelOperationScanPlanning
+   * uses only the needed columns for the scan. Otherwise, the full relation 
output is used.
+   */
+  private def buildReplaceDataReadRelation(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): (DataSourceV2Relation, Seq[Attribute], 
Seq[AttributeReference]) = {
+
+    val operation = operationTable.operation
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
+    val connectorDataAttrs = resolveRequiredDataAttrs(relation, operation)
+    val isNarrow = operation.supportsColumnUpdates() && 
connectorDataAttrs.nonEmpty
+
+    val readRelation = if (isNarrow) {
+      val allRequired = (connectorDataAttrs ++ 
computeAssignedAttrs(assignments)).distinct
+      buildRelationWithAttrs(relation, operationTable, metadataAttrs, 
dataAttrs = allRequired,
+        cond = cond)
+    } else {
+      buildRelationWithAttrs(relation, operationTable, metadataAttrs)
+    }
+
+    val rowAttrs: Seq[Attribute] = if (isNarrow) connectorDataAttrs else 
relation.output
+
+    (readRelation, rowAttrs, metadataAttrs)
+  }
+
+  /**
+   * Builds the update projection for ReplaceData plans. Assumes assignments 
are already aligned.
+   *
+   * plan.output may be narrowed by buildReplaceDataReadRelation, so only 
columns present in the
+   * plan are projected.
+   */
   private def buildReplaceDataUpdateProjection(

Review Comment:
   This is fragile too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to