Timothy Zhang created SPARK-29185: ------------------------------------- Summary: Add new SaveMode types for Spark SQL jdbc datasource Key: SPARK-29185 URL: https://issues.apache.org/jira/browse/SPARK-29185 Project: Spark Issue Type: New Feature Components: Input/Output Affects Versions: 2.4.4 Reporter: Timothy Zhang
It is necessary to add new SaveMode for Delete, Update, and Upsert, such as: * SaveMode.Delete * SaveMode.Update * SaveMode.Upsert So that Spark SQL could support legacy RDBMS much betters, e.g. Oracle, DB2, MySQL etc. Actually code implementation of current SaveMode.Append types is very flexible. All types could share the same savePartition function, add only add new getStatement functions for Delete, Update, Upsert with SQL statements DELETE FROM, UPDATE, MERGE INTO respectively. We have an initial implementations for them: {code:java} def getDeleteStatement(table: String, rddSchema: StructType, dialect: JdbcDialect): String = { val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name) + "=?").mkString(" AND ") s"DELETE FROM ${table.toUpperCase} WHERE $columns" } def getUpdateStatement(table: String, rddSchema: StructType, priKeys: Seq[String], dialect: JdbcDialect): String = { val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)) val priCols = priKeys.map(dialect.quoteIdentifier(_)) val columns = (fullCols diff priCols).map(_ + "=?").mkString(",") val cnditns = priCols.map(_ + "=?").mkString(" AND ") s"UPDATE ${table.toUpperCase} SET $columns WHERE $cnditns" } def getMergeStatement(table: String, rddSchema: StructType, priKeys: Seq[String], dialect: JdbcDialect): String = { val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)) val priCols = priKeys.map(dialect.quoteIdentifier(_)) val nrmCols = fullCols diff priCols val fullPart = fullCols.map(c => s"${dialect.quoteIdentifier("SRC")}.$c").mkString(",") val priPart = priCols.map(c => s"${dialect.quoteIdentifier("TGT")}.$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(" AND ") val nrmPart = nrmCols.map(c => s"$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(",") val columns = fullCols.mkString(",") val placeholders = fullCols.map(_ => "?").mkString(",") s"MERGE INTO ${table.toUpperCase} AS ${dialect.quoteIdentifier("TGT")} " + s"USING TABLE(VALUES($placeholders)) " + s"AS ${dialect.quoteIdentifier("SRC")}($columns) " + s"ON $priPart " + s"WHEN NOT MATCHED THEN INSERT ($columns) VALUES ($fullPart) " + s"WHEN MATCHED THEN UPDATE SET $nrmPart" } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org