xianyinxin commented on a change in pull request #25626: [SPARK-28892][SQL] Add
UPDATE support for DataSource V2
URL: https://github.com/apache/spark/pull/25626#discussion_r320627347
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
##########
@@ -245,6 +246,28 @@ object DataSourceV2Strategy extends Strategy with
PredicateHelper {
}.toArray
DeleteFromTableExec(r.table.asDeletable, filters) :: Nil
+ case UpdateTable(r: DataSourceV2Relation, attrs, values, condition) =>
+ val nested =
attrs.asInstanceOf[Seq[Any]].filterNot(_.isInstanceOf[AttributeReference])
+ if (nested.nonEmpty) {
+ throw new AnalysisException(s"Update only support non-nested fields.
Nested: $nested")
+ }
+
+ val attrsNames = attrs.map(_.name)
+ // fail if any updated value cannot be converted.
+ val updatedValues = values.map {
+ v => DataSourceStrategy.translateExpression(v).getOrElse(
+ throw new AnalysisException(s"Exec update failed:" +
+ s" cannot translate update set to source expression: $v"))
+ }
+ // fail if any filter cannot be converted. correctness depends on
removing all matching data.
+ val filters = condition.map(
+ splitConjunctivePredicates(_).map {
+ f => DataSourceStrategy.translateFilter(f).getOrElse(
+ throw new AnalysisException(s"Exec update failed:" +
+ s" cannot translate expression to source filter: $f"))
+ }.toArray).getOrElse(Array.empty[Filter])
+ UpdateTableExec(r.table.asUpdatable, attrsNames, updatedValues,
filters)::Nil
Review comment:
Fixed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]