huaxingao commented on code in PR #44119:
URL: https://github.com/apache/spark/pull/44119#discussion_r1429645287
##########
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala:
##########
@@ -167,6 +173,241 @@ final class DataFrameWriterV2[T] private[sql](table:
String, ds: Dataset[T])
runCommand(overwrite)
}
+ /**
+ * Specifies the merge condition.
+ *
+ * Sets the condition, provided as a `String`, to be used for merging data.
This condition
+ * is converted internally to a `Column` and used to determine how rows from
the source
+ * DataFrame are matched with rows in the target table.
+ *
+ * @param condition a `String` representing the merge condition.
+ * @return the current `DataFrameWriterV2` instance with the specified merge
condition set.
+ */
+ def on(condition: String): DataFrameWriterV2[T] = {
+ on(Column(condition))
+ }
+
+ /**
+ * Specifies the merge condition.
+ *
+ * Sets the condition to be used for merging data. This condition is used to
determine
+ * how rows from the source DataFrame are matched with rows in the target
table.
+ *
+ * @param condition a `Column` representing the merge condition.
+ * @return the current `DataFrameWriterV2` instance with the specified merge
condition set.
+ */
+ def on(condition: Column): DataFrameWriterV2[T] = {
+ this.on = Some(condition)
+ this
+ }
+
+ /**
+ * Initialize a `WhenMatched` action without any condition.
+ *
+ * This `WhenMatched` can be followed by one of the following merge actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @return a new `WhenMatched` object.
+ */
+ def whenMatched(): WhenMatched[T] = {
+ new WhenMatched[T](this, None)
+ }
+
+ /**
+ * Initialize a `WhenMatched` action with a condition.
+ *
+ * This `WhenMatched` action will be executed if and only if the specified
`condition`
+ * is satisfied.
+ *
+ * This `WhenMatched` can be followed by one of the following merge actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @param condition a `Column` representing the condition to be evaluated
for the action.
+ * @return a new `WhenMatched` object configured with the specified
condition.
+ */
+ def whenMatched(condition: Column): WhenMatched[T] = {
+ new WhenMatched[T](this, Some(condition.expr))
+ }
+
+ /**
+ * Initialize a `WhenMatched` action with a specified condition.
+ *
+ * This `WhenMatched` action will be executed if and only if the given
`condition`
+ * is satisfied. The condition is represented as a `String` and internally
converted
+ * to a `Column`.
+ *
+ * The `WhenMatched` instance can perform one of the following merge actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @param condition a `String` representing a column name which specifies
the condition
+ * to be evaluated for the action.
+ * @return a new `WhenMatched` object configured with the specified
condition.
+ */
+ def whenMatched(condition: String): WhenMatched[T] = {
+ whenMatched(Column(condition))
+ }
+
+ /**
+ * Initialize a `WhenNotMatched` action without any condition.
+ *
+ * This `WhenNotMatched` can be followed by one of the following merge
actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `insertAll`: Insert all the target table with source dataset records.
+ * - `insert(Map)`: Insert all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @return a new `WhenNotMatched` object.
+ */
+ def whenNotMatched(): WhenNotMatched[T] = {
+ new WhenNotMatched[T](this, None)
+ }
+
+ /**
+ * Initialize a `WhenNotMatched` action with a condition.
+ *
+ * This `WhenNotMatched` action will be executed if and only if the
specified `condition`
+ * is satisfied.
+ *
+ * This `WhenNotMatched` can be followed by one of the following merge
actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `insertAll`: Insert all the target table with source dataset records.
+ * - `insert(Map)`: Insert all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @param condition a `Column` representing the condition to be evaluated
for the action.
+ * @return a new `WhenNotMatched` object configured with the specified
condition.
+ */
+ def whenNotMatched(condition: Column): WhenNotMatched[T] = {
+ new WhenNotMatched[T](this, Some(condition.expr))
+ }
+
+ /**
+ * Initialize a `WhenNotMatched` action with a condition.
+ *
+ * This `WhenNotMatched` action will be executed if and only if the
specified `condition`
+ * is satisfied. The condition is represented as a `String` and internally
converted
+ * to a `Column`.
+ *
+ * This `WhenNotMatched` can be followed by one of the following merge
actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `insertAll`: Insert all the target table with source dataset records.
+ * - `insert(Map)`: Insert all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @param condition a `String` representing a column name which specifies
the condition
+ * to be evaluated for the action.
+ * @return a new `WhenNotMatched` object configured with the specified
condition.
+ */
+ def whenNotMatched(condition: String): WhenNotMatched[T] = {
+ whenNotMatched(Column(condition))
+ }
+
+ /**
+ * Initialize a `WhenNotMatchedBySource` action without any condition.
+ *
+ * This `WhenNotMatchedBySource` can be followed by one of the following
merge actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `insertAll`: Insert all the target table with source dataset records.
+ * - `insert(Map)`: Insert all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @return a new `WhenNotMatchedBySource` object.
+ */
+ def whenNotMatchedBySource(): WhenNotMatchedBySource[T] = {
+ new WhenNotMatchedBySource[T](this, None)
+ }
+
+ /**
+ * Initialize a `WhenNotMatchedBySource` action with a condition.
+ *
+ * This `WhenNotMatchedBySource` action will be executed if and only if the
specified `condition`
+ * is satisfied.
+ *
+ * This `WhenNotMatchedBySource` can be followed by one of the following
merge actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `insertAll`: Insert all the target table with source dataset records.
+ * - `insert(Map)`: Insert all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @param condition a `Column` representing the condition to be evaluated
for the action.
+ * @return a new `WhenNotMatchedBySource` object configured with the
specified condition.
+ */
+ def whenNotMatchedBySource(condition: Column): WhenNotMatchedBySource[T] = {
+ new WhenNotMatchedBySource[T](this, Some(condition.expr))
+ }
+
+ /**
+ * Initialize a `WhenNotMatchedBySource` action with a condition.
+ *
+ * This `WhenNotMatchedBySource` action will be executed if and only if the
specified `condition`
+ * is satisfied. The condition is represented as a `String` and internally
converted
+ * to a `Column`.
+ *
+ * This `WhenNotMatchedBySource` can be followed by one of the following
merge actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `insertAll`: Insert all the target table with source dataset records.
+ * - `insert(Map)`: Insert all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @param condition a `String` representing a column name which specifies
the condition
+ * to be evaluated for the action.
+ * @return a new `WhenNotMatchedBySource` object configured with the
specified condition.
+ */
+ def whenNotMatchedBySource(condition: String): WhenNotMatchedBySource[T] = {
+ whenNotMatchedBySource(Column(condition))
+ }
+
+ /**
+ * Executes the merge operation.
+ */
+ def merge(): Unit = {
+ if (on.isEmpty) {
+ throw new IllegalStateException("The 'on' condition cannot be None")
Review Comment:
Changed to `mergeInto(table: String, condition: Column)`
##########
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala:
##########
@@ -167,6 +173,59 @@ final class DataFrameWriterV2[T] private[sql](table:
String, ds: Dataset[T])
runCommand(overwrite)
}
+ def on(condition: Column): MergeIntoWriter[T] = {
+ this.on = Some(condition)
+ this
+ }
+
+ def whenMatched(): WhenMatched[T] = {
+ new WhenMatched[T](this, None)
+ }
+
+ def whenMatched(condition: Column): WhenMatched[T] = {
+ new WhenMatched[T](this, Some(condition.expr))
+ }
+
+ def whenNotMatched(): WhenNotMatched[T] = {
+ new WhenNotMatched[T](this, None)
+ }
+
+ def whenNotMatched(condition: Column): WhenNotMatched[T] = {
+ new WhenNotMatched[T](this, Some(condition.expr))
+ }
+
+ def whenNotMatchedBySource(): WhenNotMatchedBySource[T] = {
+ new WhenNotMatchedBySource[T](this, None)
+ }
+
+ def whenNotMatchedBySource(condition: Column): WhenNotMatchedBySource[T] = {
+ new WhenNotMatchedBySource[T](this, Some(condition.expr))
+ }
+
+ /**
+ * Executes the merge operation.
+ */
+ def merge(): Unit = {
+ if (on.isEmpty) {
+ throw new IllegalStateException("The 'on' condition cannot be None")
+ }
+
+ if (matchedActions.isEmpty && notMatchedActions.isEmpty &&
notMatchedBySourceActions.isEmpty) {
+ throw new IllegalStateException("At least one of matchedActions,
notMatchedActions," +
Review Comment:
new error was added
##########
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala:
##########
@@ -343,3 +402,312 @@ trait CreateTableWriter[T] extends
WriteConfigMethods[CreateTableWriter[T]] {
*/
def tableProperty(property: String, value: String): CreateTableWriter[T]
}
+
+trait MergeIntoWriter[T] {
+
+ /**
+ * Specifies the merge condition.
+ *
+ * Sets the condition to be used for merging data. This condition is used to
determine
+ * how rows from the source DataFrame are matched with rows in the target
table.
+ *
+ * @param condition a `Column` representing the merge condition.
+ * @return the current `DataFrameWriterV2` instance with the specified merge
condition set.
+ */
+ def on(condition: Column): MergeIntoWriter[T]
+
+ /**
+ * Initialize a `WhenMatched` action without any condition.
+ *
+ * This `WhenMatched` can be followed by one of the following merge actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @return a new `WhenMatched` object.
+ */
+ def whenMatched(): WhenMatched[T]
+
+ /**
+ * Initialize a `WhenMatched` action with a condition.
+ *
+ * This `WhenMatched` action will be executed if and only if the specified
`condition`
+ * is satisfied.
+ *
+ * This `WhenMatched` can be followed by one of the following merge actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @param condition a `Column` representing the condition to be evaluated
for the action.
+ * @return a new `WhenMatched` object configured with the specified
condition.
+ */
+ def whenMatched(condition: Column): WhenMatched[T]
+
+ /**
+ * Initialize a `WhenNotMatched` action without any condition.
+ *
+ * This `WhenNotMatched` can be followed by one of the following merge
actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `insertAll`: Insert all the target table with source dataset records.
+ * - `insert(Map)`: Insert all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @return a new `WhenNotMatched` object.
+ */
+ def whenNotMatched(): WhenNotMatched[T]
+
+ /**
+ * Initialize a `WhenNotMatched` action with a condition.
+ *
+ * This `WhenNotMatched` action will be executed if and only if the
specified `condition`
+ * is satisfied.
+ *
+ * This `WhenNotMatched` can be followed by one of the following merge
actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `insertAll`: Insert all the target table with source dataset records.
+ * - `insert(Map)`: Insert all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @param condition a `Column` representing the condition to be evaluated
for the action.
+ * @return a new `WhenNotMatched` object configured with the specified
condition.
+ */
+ def whenNotMatched(condition: Column): WhenNotMatched[T]
+
+ /**
+ * Initialize a `WhenNotMatchedBySource` action without any condition.
+ *
+ * This `WhenNotMatchedBySource` can be followed by one of the following
merge actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `insertAll`: Insert all the target table with source dataset records.
+ * - `insert(Map)`: Insert all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @return a new `WhenNotMatchedBySource` object.
+ */
+ def whenNotMatchedBySource(): WhenNotMatchedBySource[T]
+
+ /**
+ * Initialize a `WhenNotMatchedBySource` action with a condition.
+ *
+ * This `WhenNotMatchedBySource` action will be executed if and only if the
specified `condition`
+ * is satisfied.
+ *
+ * This `WhenNotMatchedBySource` can be followed by one of the following
merge actions:
+ * - `updateAll`: Update all the target table fields with source dataset
fields.
+ * - `update(Map)`: Update all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `insertAll`: Insert all the target table with source dataset records.
+ * - `insert(Map)`: Insert all the target table records while changing only
+ * a subset of fields based on the provided assignment.
+ * - `delete`: Delete all the target table records.
+ *
+ * @param condition a `Column` representing the condition to be evaluated
for the action.
+ * @return a new `WhenNotMatchedBySource` object configured with the
specified condition.
+ */
+ def whenNotMatchedBySource(condition: Column): WhenNotMatchedBySource[T]
+}
+
+/**
+ * A class for defining actions to be taken when matching rows in a DataFrame
during
+ * an update operation.
+ *
+ * @param dfWriter The DataFrameWriterV2 instance responsible for writing
data to a
+ * target DataFrame.
+ * @param condition An optional condition Expression that specifies when the
actions
+ * should be applied.
+ * If the condition is None, the actions will be applied to
all matched rows.
+ *
+ * @tparam T The type of data in the DataFrame.
+ */
+case class WhenMatched[T] (dfWriter: DataFrameWriterV2[T], condition:
Option[Expression]) {
+ /**
+ * Specifies an action to update all matched rows in the DataFrame.
+ *
+ * @return The DataFrameWriterV2 instance with the update all action
configured.
+ */
+ def updateAll(): DataFrameWriterV2[T] = {
+ dfWriter.matchedActions = dfWriter.matchedActions :+
UpdateStarAction(condition)
+ this.dfWriter
+ }
+
+ /**
+ * Specifies an action to update matched rows in the DataFrame with the
provided column
+ * assignments.
+ *
+ * @param set A Map of column names to Column expressions representing the
updates to be applied.
+ * @return The DataFrameWriterV2 instance with the update action configured.
+ */
+ def update(set: Map[String, Column]): DataFrameWriterV2[T] = {
+ dfWriter.matchedActions = dfWriter.matchedActions :+
+ UpdateAction(condition, set.map(x => Assignment(expr(x._1).expr,
x._2.expr)).toSeq)
+ this.dfWriter
+ }
+
+ /**
+ * Specifies an action to delete matched rows from the DataFrame.
+ *
+ * @return The DataFrameWriterV2 instance with the delete action configured.
+ */
+ def delete(): DataFrameWriterV2[T] = {
+ dfWriter.matchedActions = dfWriter.matchedActions :+
DeleteAction(condition)
+ this.dfWriter
+ }
+}
+
+/**
+ * A class for defining actions to be taken when no matching rows are found in
a DataFrame
+ * during an update operation.
+ *
+ * @param dfWriter The DataFrameWriterV2 instance responsible for writing
data to a
+ * target DataFrame.
+ * @param condition An optional condition Expression that specifies when the
actions
+ * defined in this configuration should be applied.
+ * If the condition is None, the actions will be applied
when there
+ * are no matching rows.
+ *
+ * @tparam T The type of data in the DataFrame.
+ */
+case class WhenNotMatched[T] (dfWriter: DataFrameWriterV2[T], condition:
Option[Expression]) {
Review Comment:
Update and Delete were removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]