cloud-fan commented on code in PR #35395:
URL: https://github.com/apache/spark/pull/35395#discussion_r847970408
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -176,6 +178,77 @@ object OverwritePartitionsDynamic {
}
}
+trait RowLevelWrite extends V2WriteCommand with SupportsSubquery {
+ def operation: RowLevelOperation
+ def condition: Expression
+ def originalTable: NamedRelation
+}
+
+/**
+ * Replace groups of data in an existing table during a row-level operation.
+ *
+ * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE
operations for data sources
+ * that can replace groups of data (e.g. files, partitions).
+ *
+ * @param table a plan that references a row-level operation table
+ * @param condition a condition that defines matching groups
+ * @param query a query with records that should replace the records that were
read
+ * @param originalTable a plan for the original table for which the row-level
command was triggered
+ * @param write a logical write, if already constructed
+ */
+case class ReplaceData(
+ table: NamedRelation,
+ condition: Expression,
+ query: LogicalPlan,
+ originalTable: NamedRelation,
+ write: Option[Write] = None) extends RowLevelWrite {
+
+ override lazy val isByName: Boolean = false
+ override lazy val references: AttributeSet = query.outputSet
+ override lazy val stringArgs: Iterator[Any] = Iterator(table, query, write)
+
+ lazy val operation: RowLevelOperation = {
+ EliminateSubqueryAliases(table) match {
+ case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _,
_) =>
+ operation
+ case _ =>
+ throw new AnalysisException(s"Cannot retrieve row-level operation from
$table")
+ }
+ }
+
+ // the incoming query may include metadata columns
+ lazy val dataInput: Seq[Attribute] = {
+ val tableAttrNames = table.output.map(_.name)
+ query.output.filter(attr => tableAttrNames.exists(conf.resolver(_,
attr.name)))
+ }
+
+ override def outputResolved: Boolean = {
+ assert(table.resolved && query.resolved,
+ "`outputResolved` can only be called when `table` and `query` are both
resolved.")
+
+ // take into account only incoming data columns and ignore metadata
columns in the query
+ // they will be discarded after the logical write is built in the optimizer
+ // metadata columns may be needed to request a correct distribution or
ordering
+ // but are not passed back to the data source during writes
+
+ table.skipSchemaResolution || (dataInput.size == table.output.size &&
Review Comment:
do we really need to check this? the input query is built by spark and is
directly reading the table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]