[jira] [Updated] (SPARK-29185) Add new SaveMode types for Spark SQL jdbc datasource

2020-03-17 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-29185:
--
Affects Version/s: (was: 3.0.0)
   3.1.0

> Add new SaveMode types for Spark SQL jdbc datasource
> 
>
> Key: SPARK-29185
> URL: https://issues.apache.org/jira/browse/SPARK-29185
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Timothy Zhang
>Priority: Major
>
>  It is necessary to add new SaveMode for Delete, Update, and Upsert, such as:
>  * SaveMode.Delete
>  * SaveMode.Update
>  * SaveMode.Upsert
> So that Spark SQL could support legacy RDBMS much betters, e.g. Oracle, DB2, 
> MySQL etc. Actually code implementation of current SaveMode.Append types is 
> very flexible. All types could share the same savePartition function, add 
> only add new getStatement functions for Delete, Update, Upsert with SQL 
> statements DELETE FROM, UPDATE, MERGE INTO respectively. We have an initial 
> implementations for them:
> {code:java}
> def getDeleteStatement(table: String, rddSchema: StructType, dialect: 
> JdbcDialect): String = {
> val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name) + 
> "=?").mkString(" AND ")
> s"DELETE FROM ${table.toUpperCase} WHERE $columns"
>   }
>   def getUpdateStatement(table: String, rddSchema: StructType, priKeys: 
> Seq[String], dialect: JdbcDialect): String = {
> val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
> val priCols = priKeys.map(dialect.quoteIdentifier(_))
> val columns = (fullCols diff priCols).map(_ + "=?").mkString(",")
> val cnditns = priCols.map(_ + "=?").mkString(" AND ")
> s"UPDATE ${table.toUpperCase} SET $columns WHERE $cnditns"
>   }
>   def getMergeStatement(table: String, rddSchema: StructType, priKeys: 
> Seq[String], dialect: JdbcDialect): String = {
> val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
> val priCols = priKeys.map(dialect.quoteIdentifier(_))
> val nrmCols = fullCols diff priCols
> val fullPart = fullCols.map(c => 
> s"${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
> val priPart = priCols.map(c => 
> s"${dialect.quoteIdentifier("TGT")}.$c=${dialect.quoteIdentifier("SRC")}.$c").mkString("
>  AND ")
> val nrmPart = nrmCols.map(c => 
> s"$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
> val columns = fullCols.mkString(",")
> val placeholders = fullCols.map(_ => "?").mkString(",")
> s"MERGE INTO ${table.toUpperCase} AS ${dialect.quoteIdentifier("TGT")} " +
>   s"USING TABLE(VALUES($placeholders)) " +
>   s"AS ${dialect.quoteIdentifier("SRC")}($columns) " +
>   s"ON $priPart " +
>   s"WHEN NOT MATCHED THEN INSERT ($columns) VALUES ($fullPart) " +
>   s"WHEN MATCHED THEN UPDATE SET $nrmPart"
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-29185) Add new SaveMode types for Spark SQL jdbc datasource

2019-09-24 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-29185:
-
Component/s: (was: Input/Output)
 SQL

> Add new SaveMode types for Spark SQL jdbc datasource
> 
>
> Key: SPARK-29185
> URL: https://issues.apache.org/jira/browse/SPARK-29185
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.4
>Reporter: Timothy Zhang
>Priority: Major
>
>  It is necessary to add new SaveMode for Delete, Update, and Upsert, such as:
>  * SaveMode.Delete
>  * SaveMode.Update
>  * SaveMode.Upsert
> So that Spark SQL could support legacy RDBMS much betters, e.g. Oracle, DB2, 
> MySQL etc. Actually code implementation of current SaveMode.Append types is 
> very flexible. All types could share the same savePartition function, add 
> only add new getStatement functions for Delete, Update, Upsert with SQL 
> statements DELETE FROM, UPDATE, MERGE INTO respectively. We have an initial 
> implementations for them:
> {code:java}
> def getDeleteStatement(table: String, rddSchema: StructType, dialect: 
> JdbcDialect): String = {
> val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name) + 
> "=?").mkString(" AND ")
> s"DELETE FROM ${table.toUpperCase} WHERE $columns"
>   }
>   def getUpdateStatement(table: String, rddSchema: StructType, priKeys: 
> Seq[String], dialect: JdbcDialect): String = {
> val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
> val priCols = priKeys.map(dialect.quoteIdentifier(_))
> val columns = (fullCols diff priCols).map(_ + "=?").mkString(",")
> val cnditns = priCols.map(_ + "=?").mkString(" AND ")
> s"UPDATE ${table.toUpperCase} SET $columns WHERE $cnditns"
>   }
>   def getMergeStatement(table: String, rddSchema: StructType, priKeys: 
> Seq[String], dialect: JdbcDialect): String = {
> val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
> val priCols = priKeys.map(dialect.quoteIdentifier(_))
> val nrmCols = fullCols diff priCols
> val fullPart = fullCols.map(c => 
> s"${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
> val priPart = priCols.map(c => 
> s"${dialect.quoteIdentifier("TGT")}.$c=${dialect.quoteIdentifier("SRC")}.$c").mkString("
>  AND ")
> val nrmPart = nrmCols.map(c => 
> s"$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
> val columns = fullCols.mkString(",")
> val placeholders = fullCols.map(_ => "?").mkString(",")
> s"MERGE INTO ${table.toUpperCase} AS ${dialect.quoteIdentifier("TGT")} " +
>   s"USING TABLE(VALUES($placeholders)) " +
>   s"AS ${dialect.quoteIdentifier("SRC")}($columns) " +
>   s"ON $priPart " +
>   s"WHEN NOT MATCHED THEN INSERT ($columns) VALUES ($fullPart) " +
>   s"WHEN MATCHED THEN UPDATE SET $nrmPart"
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org