Timothy Zhang created SPARK-29185:
-------------------------------------

             Summary: Add new SaveMode types for Spark SQL jdbc datasource
                 Key: SPARK-29185
                 URL: https://issues.apache.org/jira/browse/SPARK-29185
             Project: Spark
          Issue Type: New Feature
          Components: Input/Output
    Affects Versions: 2.4.4
            Reporter: Timothy Zhang


 It is necessary to add new SaveMode for Delete, Update, and Upsert, such as:
 * SaveMode.Delete
 * SaveMode.Update
 * SaveMode.Upsert

So that Spark SQL could support legacy RDBMS much betters, e.g. Oracle, DB2, 
MySQL etc. Actually code implementation of current SaveMode.Append types is 
very flexible. All types could share the same savePartition function, add only 
add new getStatement functions for Delete, Update, Upsert with SQL statements 
DELETE FROM, UPDATE, MERGE INTO respectively. We have an initial 
implementations for them:
{code:java}
def getDeleteStatement(table: String, rddSchema: StructType, dialect: 
JdbcDialect): String = {
    val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name) + 
"=?").mkString(" AND ")

    s"DELETE FROM ${table.toUpperCase} WHERE $columns"
  }

  def getUpdateStatement(table: String, rddSchema: StructType, priKeys: 
Seq[String], dialect: JdbcDialect): String = {
    val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
    val priCols = priKeys.map(dialect.quoteIdentifier(_))
    val columns = (fullCols diff priCols).map(_ + "=?").mkString(",")
    val cnditns = priCols.map(_ + "=?").mkString(" AND ")

    s"UPDATE ${table.toUpperCase} SET $columns WHERE $cnditns"
  }

  def getMergeStatement(table: String, rddSchema: StructType, priKeys: 
Seq[String], dialect: JdbcDialect): String = {
    val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
    val priCols = priKeys.map(dialect.quoteIdentifier(_))
    val nrmCols = fullCols diff priCols

    val fullPart = fullCols.map(c => 
s"${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
    val priPart = priCols.map(c => 
s"${dialect.quoteIdentifier("TGT")}.$c=${dialect.quoteIdentifier("SRC")}.$c").mkString("
 AND ")
    val nrmPart = nrmCols.map(c => 
s"$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(",")

    val columns = fullCols.mkString(",")
    val placeholders = fullCols.map(_ => "?").mkString(",")

    s"MERGE INTO ${table.toUpperCase} AS ${dialect.quoteIdentifier("TGT")} " +
      s"USING TABLE(VALUES($placeholders)) " +
      s"AS ${dialect.quoteIdentifier("SRC")}($columns) " +
      s"ON $priPart " +
      s"WHEN NOT MATCHED THEN INSERT ($columns) VALUES ($fullPart) " +
      s"WHEN MATCHED THEN UPDATE SET $nrmPart"
  }
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to