[jira] [Updated] (SPARK-29185) Add new SaveMode types for Spark SQL jdbc datasource
[ https://issues.apache.org/jira/browse/SPARK-29185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-29185: -- Affects Version/s: (was: 3.0.0) 3.1.0 > Add new SaveMode types for Spark SQL jdbc datasource > > > Key: SPARK-29185 > URL: https://issues.apache.org/jira/browse/SPARK-29185 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.1.0 >Reporter: Timothy Zhang >Priority: Major > > It is necessary to add new SaveMode for Delete, Update, and Upsert, such as: > * SaveMode.Delete > * SaveMode.Update > * SaveMode.Upsert > So that Spark SQL could support legacy RDBMS much betters, e.g. Oracle, DB2, > MySQL etc. Actually code implementation of current SaveMode.Append types is > very flexible. All types could share the same savePartition function, add > only add new getStatement functions for Delete, Update, Upsert with SQL > statements DELETE FROM, UPDATE, MERGE INTO respectively. We have an initial > implementations for them: > {code:java} > def getDeleteStatement(table: String, rddSchema: StructType, dialect: > JdbcDialect): String = { > val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name) + > "=?").mkString(" AND ") > s"DELETE FROM ${table.toUpperCase} WHERE $columns" > } > def getUpdateStatement(table: String, rddSchema: StructType, priKeys: > Seq[String], dialect: JdbcDialect): String = { > val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)) > val priCols = priKeys.map(dialect.quoteIdentifier(_)) > val columns = (fullCols diff priCols).map(_ + "=?").mkString(",") > val cnditns = priCols.map(_ + "=?").mkString(" AND ") > s"UPDATE ${table.toUpperCase} SET $columns WHERE $cnditns" > } > def getMergeStatement(table: String, rddSchema: StructType, priKeys: > Seq[String], dialect: JdbcDialect): String = { > val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)) > val priCols = priKeys.map(dialect.quoteIdentifier(_)) > val nrmCols = fullCols diff priCols > val fullPart = fullCols.map(c => > s"${dialect.quoteIdentifier("SRC")}.$c").mkString(",") > val priPart = priCols.map(c => > s"${dialect.quoteIdentifier("TGT")}.$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(" > AND ") > val nrmPart = nrmCols.map(c => > s"$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(",") > val columns = fullCols.mkString(",") > val placeholders = fullCols.map(_ => "?").mkString(",") > s"MERGE INTO ${table.toUpperCase} AS ${dialect.quoteIdentifier("TGT")} " + > s"USING TABLE(VALUES($placeholders)) " + > s"AS ${dialect.quoteIdentifier("SRC")}($columns) " + > s"ON $priPart " + > s"WHEN NOT MATCHED THEN INSERT ($columns) VALUES ($fullPart) " + > s"WHEN MATCHED THEN UPDATE SET $nrmPart" > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-29185) Add new SaveMode types for Spark SQL jdbc datasource
[ https://issues.apache.org/jira/browse/SPARK-29185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-29185: - Component/s: (was: Input/Output) SQL > Add new SaveMode types for Spark SQL jdbc datasource > > > Key: SPARK-29185 > URL: https://issues.apache.org/jira/browse/SPARK-29185 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.4 >Reporter: Timothy Zhang >Priority: Major > > It is necessary to add new SaveMode for Delete, Update, and Upsert, such as: > * SaveMode.Delete > * SaveMode.Update > * SaveMode.Upsert > So that Spark SQL could support legacy RDBMS much betters, e.g. Oracle, DB2, > MySQL etc. Actually code implementation of current SaveMode.Append types is > very flexible. All types could share the same savePartition function, add > only add new getStatement functions for Delete, Update, Upsert with SQL > statements DELETE FROM, UPDATE, MERGE INTO respectively. We have an initial > implementations for them: > {code:java} > def getDeleteStatement(table: String, rddSchema: StructType, dialect: > JdbcDialect): String = { > val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name) + > "=?").mkString(" AND ") > s"DELETE FROM ${table.toUpperCase} WHERE $columns" > } > def getUpdateStatement(table: String, rddSchema: StructType, priKeys: > Seq[String], dialect: JdbcDialect): String = { > val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)) > val priCols = priKeys.map(dialect.quoteIdentifier(_)) > val columns = (fullCols diff priCols).map(_ + "=?").mkString(",") > val cnditns = priCols.map(_ + "=?").mkString(" AND ") > s"UPDATE ${table.toUpperCase} SET $columns WHERE $cnditns" > } > def getMergeStatement(table: String, rddSchema: StructType, priKeys: > Seq[String], dialect: JdbcDialect): String = { > val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)) > val priCols = priKeys.map(dialect.quoteIdentifier(_)) > val nrmCols = fullCols diff priCols > val fullPart = fullCols.map(c => > s"${dialect.quoteIdentifier("SRC")}.$c").mkString(",") > val priPart = priCols.map(c => > s"${dialect.quoteIdentifier("TGT")}.$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(" > AND ") > val nrmPart = nrmCols.map(c => > s"$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(",") > val columns = fullCols.mkString(",") > val placeholders = fullCols.map(_ => "?").mkString(",") > s"MERGE INTO ${table.toUpperCase} AS ${dialect.quoteIdentifier("TGT")} " + > s"USING TABLE(VALUES($placeholders)) " + > s"AS ${dialect.quoteIdentifier("SRC")}($columns) " + > s"ON $priPart " + > s"WHEN NOT MATCHED THEN INSERT ($columns) VALUES ($fullPart) " + > s"WHEN MATCHED THEN UPDATE SET $nrmPart" > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org