aokolnychyi commented on code in PR #35395:
URL: https://github.com/apache/spark/pull/35395#discussion_r848736408


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -176,6 +178,77 @@ object OverwritePartitionsDynamic {
   }
 }
 
+trait RowLevelWrite extends V2WriteCommand with SupportsSubquery {
+  def operation: RowLevelOperation
+  def condition: Expression
+  def originalTable: NamedRelation
+}
+
+/**
+ * Replace groups of data in an existing table during a row-level operation.
+ *
+ * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE 
operations for data sources
+ * that can replace groups of data (e.g. files, partitions).
+ *
+ * @param table a plan that references a row-level operation table
+ * @param condition a condition that defines matching groups
+ * @param query a query with records that should replace the records that were 
read
+ * @param originalTable a plan for the original table for which the row-level 
command was triggered
+ * @param write a logical write, if already constructed
+ */
+case class ReplaceData(
+    table: NamedRelation,
+    condition: Expression,
+    query: LogicalPlan,
+    originalTable: NamedRelation,
+    write: Option[Write] = None) extends RowLevelWrite {
+
+  override lazy val isByName: Boolean = false
+  override lazy val references: AttributeSet = query.outputSet
+  override lazy val stringArgs: Iterator[Any] = Iterator(table, query, write)
+
+  lazy val operation: RowLevelOperation = {
+    EliminateSubqueryAliases(table) match {
+      case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, 
_) =>
+        operation
+      case _ =>
+        throw new AnalysisException(s"Cannot retrieve row-level operation from 
$table")
+    }
+  }
+
+  // the incoming query may include metadata columns
+  lazy val dataInput: Seq[Attribute] = {
+    val tableAttrNames = table.output.map(_.name)
+    query.output.filter(attr => tableAttrNames.exists(conf.resolver(_, 
attr.name)))
+  }
+
+  override def outputResolved: Boolean = {
+    assert(table.resolved && query.resolved,
+      "`outputResolved` can only be called when `table` and `query` are both 
resolved.")
+
+    // take into account only incoming data columns and ignore metadata 
columns in the query
+    // they will be discarded after the logical write is built in the optimizer
+    // metadata columns may be needed to request a correct distribution or 
ordering
+    // but are not passed back to the data source during writes
+
+    table.skipSchemaResolution || (dataInput.size == table.output.size &&

Review Comment:
   It may be redundant in case of DELETE but it will be required for UPDATE and 
MERGE when the incoming values no longer solely depend on what was read. This 
will prevent setting nullable values for non-nullable attributes, for instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to