cloud-fan commented on code in PR #44119:
URL: https://github.com/apache/spark/pull/44119#discussion_r1429453554


##########
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala:
##########
@@ -343,3 +402,312 @@ trait CreateTableWriter[T] extends 
WriteConfigMethods[CreateTableWriter[T]] {
    */
   def tableProperty(property: String, value: String): CreateTableWriter[T]
 }
+
+trait MergeIntoWriter[T] {
+
+  /**
+   * Specifies the merge condition.
+   *
+   * Sets the condition to be used for merging data. This condition is used to 
determine
+   * how rows from the source DataFrame are matched with rows in the target 
table.
+   *
+   * @param condition a `Column` representing the merge condition.
+   * @return the current `DataFrameWriterV2` instance with the specified merge 
condition set.
+   */
+  def on(condition: Column): MergeIntoWriter[T]
+
+  /**
+   * Initialize a `WhenMatched` action without any condition.
+   *
+   * This `WhenMatched` can be followed by one of the following merge actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @return a new `WhenMatched` object.
+   */
+  def whenMatched(): WhenMatched[T]
+
+  /**
+   * Initialize a `WhenMatched` action with a condition.
+   *
+   * This `WhenMatched` action will be executed if and only if the specified 
`condition`
+   * is satisfied.
+   *
+   * This `WhenMatched` can be followed by one of the following merge actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @param condition a `Column` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenMatched` object configured with the specified 
condition.
+   */
+  def whenMatched(condition: Column): WhenMatched[T]
+
+  /**
+   * Initialize a `WhenNotMatched` action without any condition.
+   *
+   * This `WhenNotMatched` can be followed by one of the following merge 
actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `insertAll`: Insert all the target table with source dataset records.
+   *   - `insert(Map)`: Insert all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @return a new `WhenNotMatched` object.
+   */
+  def whenNotMatched(): WhenNotMatched[T]
+
+  /**
+   * Initialize a `WhenNotMatched` action with a condition.
+   *
+   * This `WhenNotMatched` action will be executed if and only if the 
specified `condition`
+   * is satisfied.
+   *
+   * This `WhenNotMatched` can be followed by one of the following merge 
actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `insertAll`: Insert all the target table with source dataset records.
+   *   - `insert(Map)`: Insert all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @param condition a `Column` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenNotMatched` object configured with the specified 
condition.
+   */
+  def whenNotMatched(condition: Column): WhenNotMatched[T]
+
+  /**
+   * Initialize a `WhenNotMatchedBySource` action without any condition.
+   *
+   * This `WhenNotMatchedBySource` can be followed by one of the following 
merge actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `insertAll`: Insert all the target table with source dataset records.
+   *   - `insert(Map)`: Insert all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @return a new `WhenNotMatchedBySource` object.
+   */
+  def whenNotMatchedBySource(): WhenNotMatchedBySource[T]
+
+  /**
+   * Initialize a `WhenNotMatchedBySource` action with a condition.
+   *
+   * This `WhenNotMatchedBySource` action will be executed if and only if the 
specified `condition`
+   * is satisfied.
+   *
+   * This `WhenNotMatchedBySource` can be followed by one of the following 
merge actions:
+   *   - `updateAll`: Update all the target table fields with source dataset 
fields.
+   *   - `update(Map)`: Update all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `insertAll`: Insert all the target table with source dataset records.
+   *   - `insert(Map)`: Insert all the target table records while changing only
+   *     a subset of fields based on the provided assignment.
+   *   - `delete`: Delete all the target table records.
+   *
+   * @param condition a `Column` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenNotMatchedBySource` object configured with the 
specified condition.
+   */
+  def whenNotMatchedBySource(condition: Column): WhenNotMatchedBySource[T]
+}
+
+/**
+ * A class for defining actions to be taken when matching rows in a DataFrame 
during
+ * an update operation.
+ *
+ * @param dfWriter   The DataFrameWriterV2 instance responsible for writing 
data to a
+ *                   target DataFrame.
+ * @param condition  An optional condition Expression that specifies when the 
actions
+ *                   should be applied.
+ *                   If the condition is None, the actions will be applied to 
all matched rows.
+ *
+ * @tparam T         The type of data in the DataFrame.
+ */
+case class WhenMatched[T] (dfWriter: DataFrameWriterV2[T], condition: 
Option[Expression]) {
+  /**
+   * Specifies an action to update all matched rows in the DataFrame.
+   *
+   * @return The DataFrameWriterV2 instance with the update all action 
configured.
+   */
+  def updateAll(): DataFrameWriterV2[T] = {
+    dfWriter.matchedActions = dfWriter.matchedActions :+ 
UpdateStarAction(condition)
+    this.dfWriter
+  }
+
+  /**
+   * Specifies an action to update matched rows in the DataFrame with the 
provided column
+   * assignments.
+   *
+   * @param set A Map of column names to Column expressions representing the 
updates to be applied.
+   * @return The DataFrameWriterV2 instance with the update action configured.
+   */
+  def update(set: Map[String, Column]): DataFrameWriterV2[T] = {
+    dfWriter.matchedActions = dfWriter.matchedActions :+
+      UpdateAction(condition, set.map(x => Assignment(expr(x._1).expr, 
x._2.expr)).toSeq)
+    this.dfWriter
+  }
+
+  /**
+   * Specifies an action to delete matched rows from the DataFrame.
+   *
+   * @return The DataFrameWriterV2 instance with the delete action configured.
+   */
+  def delete(): DataFrameWriterV2[T] = {
+    dfWriter.matchedActions = dfWriter.matchedActions :+ 
DeleteAction(condition)
+    this.dfWriter
+  }
+}
+
+/**
+ * A class for defining actions to be taken when no matching rows are found in 
a DataFrame
+ * during an update operation.
+ *
+ * @param dfWriter   The DataFrameWriterV2 instance responsible for writing 
data to a
+ *                   target DataFrame.
+ * @param condition  An optional condition Expression that specifies when the 
actions
+ *                   defined in this configuration should be applied.
+ *                   If the condition is None, the actions will be applied 
when there
+ *                   are no matching rows.
+ *
+ * @tparam T         The type of data in the DataFrame.
+ */
+case class WhenNotMatched[T] (dfWriter: DataFrameWriterV2[T], condition: 
Option[Expression]) {
+  /**
+   * Specifies an action to update all non-matched rows in the DataFrame.
+   *
+   * @return The DataFrameWriterV2 instance with the update all action 
configured.
+   */
+  def updateAll(): DataFrameWriterV2[T] = {
+    dfWriter.notMatchedActions = dfWriter.notMatchedActions :+ 
UpdateStarAction(condition)
+    this.dfWriter
+  }
+
+  /**
+   * Specifies an action to update non-matched rows in the DataFrame with the 
provided
+   * column assignments.
+   *
+   * @param set A Map of column names to Column expressions representing the 
updates to be applied.
+   * @return The DataFrameWriterV2 instance with the update action configured.
+   */
+  def update(set: Map[String, Column]): DataFrameWriterV2[T] = {
+    dfWriter.notMatchedActions = dfWriter.notMatchedActions :+
+      UpdateAction(condition, set.map(x => Assignment(expr(x._1).expr, 
x._2.expr)).toSeq)
+    this.dfWriter
+  }
+
+  /**
+   * Specifies an action to insert all non-matched rows into the DataFrame.
+   *
+   * @return The DataFrameWriterV2 instance with the insert all action 
configured.
+   */
+  def insertAll(): DataFrameWriterV2[T] = {
+    dfWriter.notMatchedActions = dfWriter.notMatchedActions :+ 
InsertStarAction(condition)
+    this.dfWriter
+  }
+
+  /**
+   * Specifies an action to insert non-matched rows into the DataFrame with 
the provided
+   * column assignments.
+   *
+   * @param set A Map of column names to Column expressions representing the 
values to be inserted.
+   * @return The DataFrameWriterV2 instance with the insert action configured.
+   */
+  def insert(set: Map[String, Column]): DataFrameWriterV2[T] = {
+    dfWriter.notMatchedActions = dfWriter.notMatchedActions :+
+      InsertAction(condition, set.map(x => Assignment(expr(x._1).expr, 
x._2.expr)).toSeq)
+    this.dfWriter
+  }
+
+  /**
+   * Specifies an action to delete non-matched rows from the DataFrame.
+   *
+   * @return The DataFrameWriterV2 instance with the delete action configured.
+   */
+  def delete(): DataFrameWriterV2[T] = {
+    dfWriter.notMatchedActions = dfWriter.notMatchedActions :+ 
DeleteAction(condition)
+    this.dfWriter
+  }
+}
+
+/**
+ * A class for defining actions to be performed when there is no match by 
source
+ * during a merge operation in a DataFrameWriterV2.
+ *
+ * @param dfWriter the DataFrameWriterV2 instance to which the merge actions 
will be applied.
+ * @param condition an optional condition to be used with the merge actions.
+ * @tparam T the type parameter for the DataFrameWriterV2.
+ */
+case class WhenNotMatchedBySource[T] (
+    dfWriter: DataFrameWriterV2[T],
+    condition: Option[Expression]) {
+
+  /**
+   * Specifies an action to update all non-matched rows in the target 
DataFrame when
+   * not matched by the source.
+   *
+   * @return The DataFrameWriterV2 instance with the update all action 
configured.
+   */
+  def updateAll(): DataFrameWriterV2[T] = {
+    dfWriter.notMatchedBySourceActions =
+      dfWriter.notMatchedBySourceActions :+ UpdateStarAction(condition)
+    this.dfWriter
+  }
+
+  /**
+   * Specifies an action to update non-matched rows in the target DataFrame 
with the provided
+   * column assignments when not matched by the source.
+   *
+   * @param set A Map of column names to Column expressions representing the 
updates to be applied.
+   * @return The DataFrameWriterV2 instance with the update action configured.
+   */
+  def update(set: Map[String, Column]): DataFrameWriterV2[T] = {
+    dfWriter.notMatchedBySourceActions = dfWriter.notMatchedBySourceActions :+
+      UpdateAction(condition, set.map(x => Assignment(expr(x._1).expr, 
x._2.expr)).toSeq)
+    this.dfWriter
+  }
+
+  /**
+   * Specifies an action to insert all non-matched rows into the target 
DataFrame when not
+   * matched by the source.
+   *
+   * @return The DataFrameWriterV2 instance with the insert all action 
configured.
+   */
+  def insertAll(): DataFrameWriterV2[T] = {
+    dfWriter.notMatchedBySourceActions =
+      dfWriter.notMatchedBySourceActions :+ InsertStarAction(condition)
+    this.dfWriter
+  }
+
+  /**
+   * Specifies an action to insert non-matched rows into the target DataFrame 
with the provided
+   * column assignments when not matched by the source.
+   *
+   * @param set A Map of column names to Column expressions representing the 
values to be inserted.
+   * @return The DataFrameWriterV2 instance with the insert action configured.
+   */
+  def insert(set: Map[String, Column]): DataFrameWriterV2[T] = {
+    dfWriter.notMatchedBySourceActions = dfWriter.notMatchedBySourceActions :+
+      InsertAction(condition, set.map(x => Assignment(expr(x._1).expr, 
x._2.expr)).toSeq)
+    this.dfWriter
+  }
+
+  /**
+   * Specifies an action to delete non-matched rows from the target DataFrame 
when not matched by
+   * the source.
+   *
+   * @return The DataFrameWriterV2 instance with the delete action configured.
+   */
+  def delete(): DataFrameWriterV2[T] = {

Review Comment:
   ditto, the SQL grammar does not have DELETE clause in `WHEN NOT MATCHED BY 
SOURCE`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to