dongjoon-hyun commented on code in PR #55518:
URL: https://github.com/apache/spark/pull/55518#discussion_r3192364149


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala:
##########
@@ -50,20 +51,34 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
   protected def buildOperationTable(
       table: SupportsRowLevelOperations,
       command: Command,
-      options: CaseInsensitiveStringMap): RowLevelOperationTable = {
-    val info = RowLevelOperationInfoImpl(command, options)
+      options: CaseInsensitiveStringMap,
+      updatedColumns: Seq[NamedReference] = Nil): RowLevelOperationTable = {
+    val info = RowLevelOperationInfoImpl(command, options, updatedColumns)
     val operation = table.newRowLevelOperationBuilder(info).build()
     RowLevelOperationTable(table, operation)
   }
 
+  // Builds a DataSourceV2Relation for a row-level operation, optionally 
narrowing its output.
+  //
+  // When dataAttrs is non-empty, the relation output is narrowed to include 
only columns
+  // required for a column-update write. When dataAttrs is empty, the full 
relation.output is
+  // preserved.

Review Comment:
   For function description, please follow the community style like the other 
code path.
   ```
   /**
    * ...
    */
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -106,38 +104,92 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     val remainingRowsPlan = addOperationColumn(COPY_OPERATION,
       Filter(remainingRowFilter, readRelation))
 
-    // the new state is a union of updated and copied over records
-    val query = Union(updatedRowsPlan, remainingRowsPlan)
+    val updatedAndRemainingRowsPlan = Union(updatedRowsPlan, remainingRowsPlan)
 
-    // build a plan to replace read groups in the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildReplaceDataProjections(query, relation.output, 
metadataAttrs)
+    val query = updatedAndRemainingRowsPlan
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+    val projections = buildReplaceDataProjections(query, rowAttrs, 
metadataAttrs)
     val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
     ReplaceData(writeRelation, cond, query, relation, projections, 
groupFilterCond)
   }
 
+  // Common read-relation setup shared by both CoW plan builders.
+  //
+  // When the connector supports column updates and declares required data 
attributes,
+  // the read relation is narrowed at analysis time so that
+  // GroupBasedRowLevelOperationScanPlanning uses only the needed columns for 
the scan.
+  // Otherwise the full relation output is used.

Review Comment:
   For function description, please follow the community style like the other 
code path.
   ```
   /**
    * ...
    */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to