aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1022088980


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -254,6 +254,113 @@ case class ReplaceData(
   }
 }
 
+/**
+ * Writes a delta of rows to an existing table during a row-level operation.
+ *
+ * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE 
operations for data sources
+ * that can handle deltas of rows.
+ *
+ * @param table a plan that references a row-level operation table
+ * @param condition a condition that defines matching records
+ * @param query a query with a delta of records that should written
+ * @param originalTable a plan for the original table for which the row-level 
command was triggered
+ * @param projections projections for row ID, row, metadata attributes
+ * @param write a logical write, if already constructed
+ */
+case class WriteDelta(
+    table: NamedRelation,
+    condition: Expression,
+    query: LogicalPlan,
+    originalTable: NamedRelation,
+    projections: WriteDeltaProjections,
+    write: Option[DeltaWrite] = None) extends RowLevelWrite {
+
+  override val isByName: Boolean = false
+  override val stringArgs: Iterator[Any] = Iterator(table, query, write)
+
+  override lazy val references: AttributeSet = query.outputSet
+
+  lazy val operation: SupportsDelta = {
+    EliminateSubqueryAliases(table) match {
+      case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, 
_) =>
+        operation.asInstanceOf[SupportsDelta]
+      case _ =>
+        throw new AnalysisException(s"Cannot retrieve row-level operation from 
$table")
+    }
+  }
+
+  override def outputResolved: Boolean = {
+    assert(table.resolved && query.resolved,
+      "`outputResolved` can only be called when `table` and `query` are both 
resolved.")
+
+    operationResolved && rowAttrsResolved && rowIdAttrsResolved && 
metadataAttrsResolved
+  }
+
+  private def operationResolved: Boolean = {
+    val attr = query.output.head
+    attr.name == RowDeltaUtils.OPERATION_COLUMN && attr.dataType == 
IntegerType && !attr.nullable
+  }
+
+  // validates row projection output is compatible with table attributes
+  private def rowAttrsResolved: Boolean = {
+    table.skipSchemaResolution || (projections.rowProjection match {

Review Comment:
   I am using projection schemas instead of `query.output` as those schemas 
will contain precise nullability.
   
   Once we add support for MERGE operations, there will be a node that would 
merge matching incoming and existing rows into one. That means one `SparkPlan` 
would contains deletes, updates, inserts. Metadata columns and row IDs 
attributes in `query` will always be nullable as those columns are null for 
insert rows. However, row ID and metadata are never passed to sources with 
insert rows in `DeltaWriter`. That's why we need to look at projections, not 
`query`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to