MaxGekk commented on a change in pull request #29324:
URL: https://github.com/apache/spark/pull/29324#discussion_r463942055
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
##########
@@ -908,4 +909,25 @@ object JdbcUtils extends Logging {
statement.close()
}
}
+
+ /**
+ * Update a table from the JDBC database.
+ */
+ def alterTable(
+ conn: Connection,
+ tableName: String,
+ changes: Seq[TableChange],
+ options: JDBCOptions
+ ): Unit = {
+ val dialect = JdbcDialects.get(options.url)
+ val statement = conn.createStatement
+ try {
+ statement.setQueryTimeout(options.queryTimeout)
Review comment:
Other methods have similar code:
```scala
val statement = ...
try {
statement.setQueryTimeout(options.queryTimeout)
statement.execute ...
} finally {
statement.close()
}
```
Could you put it to a private method.
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -184,15 +189,56 @@ abstract class JdbcDialect extends Serializable {
/**
* Rename an existing table.
*
- * TODO (SPARK-32382): Override this method in the dialects that don't
support such syntax.
- *
* @param oldTable The existing table.
* @param newTable New name of the table.
* @return The SQL statement to use for renaming the table.
*/
def renameTable(oldTable: String, newTable: String): String = {
s"ALTER TABLE $oldTable RENAME TO $newTable"
}
+
+ /**
+ * Alter an existing table.
+ *
+ * @param tableName The name of the table to be altered.
+ * @param changes Changes to apply to the table.
+ * @return The SQL statement to use for altering the table.
+ */
+ def alterTable(tableName: String, changes: Seq[TableChange]): Array[String]
= {
+ val updateClause = mutable.ArrayBuilder.make[String]
+ for (change <- changes) {
+ change match {
+ case add: AddColumn =>
+ add.fieldNames match {
+ case Array(name) =>
+ val dataType = JdbcUtils.getJdbcType(add.dataType(),
this).databaseTypeDefinition
+ updateClause += s"ALTER TABLE $tableName ADD COLUMN $name
$dataType"
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported TableChange
fieldNames" +
+ s" ${add.fieldNames}")
+ }
+ case rename: RenameColumn =>
+ rename.fieldNames match {
+ case Array(name) =>
+ updateClause += s"ALTER TABLE $tableName RENAME COLUMN $name TO
${rename.newName}"
+ case _ =>
Review comment:
It is not clear for me when it happens. Could you write a test for the
case.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
##########
@@ -908,4 +909,25 @@ object JdbcUtils extends Logging {
statement.close()
}
}
+
+ /**
+ * Update a table from the JDBC database.
+ */
+ def alterTable(
+ conn: Connection,
+ tableName: String,
+ changes: Seq[TableChange],
+ options: JDBCOptions
+ ): Unit = {
Review comment:
```suggestion
options: JDBCOptions): Unit = {
```
see https://github.com/databricks/scala-style-guide#spacing-and-indentation
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -184,15 +189,56 @@ abstract class JdbcDialect extends Serializable {
/**
* Rename an existing table.
*
- * TODO (SPARK-32382): Override this method in the dialects that don't
support such syntax.
- *
* @param oldTable The existing table.
* @param newTable New name of the table.
* @return The SQL statement to use for renaming the table.
*/
def renameTable(oldTable: String, newTable: String): String = {
s"ALTER TABLE $oldTable RENAME TO $newTable"
}
+
+ /**
+ * Alter an existing table.
+ *
+ * @param tableName The name of the table to be altered.
+ * @param changes Changes to apply to the table.
+ * @return The SQL statement to use for altering the table.
Review comment:
It returns multiple SQL statements, correct? And mapping of input
changes to SQL statement isn't 1<->1. Could correct this.
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -184,15 +189,56 @@ abstract class JdbcDialect extends Serializable {
/**
* Rename an existing table.
*
- * TODO (SPARK-32382): Override this method in the dialects that don't
support such syntax.
- *
* @param oldTable The existing table.
* @param newTable New name of the table.
* @return The SQL statement to use for renaming the table.
*/
def renameTable(oldTable: String, newTable: String): String = {
s"ALTER TABLE $oldTable RENAME TO $newTable"
}
+
+ /**
+ * Alter an existing table.
+ *
+ * @param tableName The name of the table to be altered.
+ * @param changes Changes to apply to the table.
+ * @return The SQL statement to use for altering the table.
+ */
+ def alterTable(tableName: String, changes: Seq[TableChange]): Array[String]
= {
+ val updateClause = mutable.ArrayBuilder.make[String]
+ for (change <- changes) {
+ change match {
+ case add: AddColumn =>
+ add.fieldNames match {
+ case Array(name) =>
+ val dataType = JdbcUtils.getJdbcType(add.dataType(),
this).databaseTypeDefinition
+ updateClause += s"ALTER TABLE $tableName ADD COLUMN $name
$dataType"
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported TableChange
fieldNames" +
+ s" ${add.fieldNames}")
+ }
+ case rename: RenameColumn =>
+ rename.fieldNames match {
+ case Array(name) =>
+ updateClause += s"ALTER TABLE $tableName RENAME COLUMN $name TO
${rename.newName}"
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported TableChange
fieldNames" +
+ s" ${rename.fieldNames}")
+ }
+ case delete: DeleteColumn =>
+ delete.fieldNames match {
+ case Array(name) =>
+ updateClause += s"ALTER TABLE $tableName DROP COLUMN $name"
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported TableChange
fieldNames" +
+ s" ${delete.fieldNames}")
+ }
+ case _ => throw new IllegalArgumentException(s"JDBC alterTable has
Unsupported" +
Review comment:
I think `throw new NotImplementedError` fits better for the case.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
##########
@@ -106,4 +106,58 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
Seq(Row("test", "people"), Row("test", "new_table")))
}
}
+
+ test("alter table ... add column") {
+ withTable("h2.test.alt_table") {
+ withConnection { conn =>
+ conn.prepareStatement("""CREATE TABLE "test"."alt_table" (id
INTEGER)""").executeUpdate()
+ }
Review comment:
Could you use JDBC Table Catalog functionality:
```scala
sql("CREATE TABLE h2.test.alt_table ...")
```
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -19,9 +19,14 @@ package org.apache.spark.sql.jdbc
import java.sql.{Connection, Date, Timestamp}
+import scala.collection.mutable
Review comment:
nit: `import scala.collection.mutable.ArrayBuilder`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
##########
@@ -908,4 +909,25 @@ object JdbcUtils extends Logging {
statement.close()
}
}
+
+ /**
+ * Update a table from the JDBC database.
+ */
+ def alterTable(
+ conn: Connection,
+ tableName: String,
+ changes: Seq[TableChange],
+ options: JDBCOptions
+ ): Unit = {
+ val dialect = JdbcDialects.get(options.url)
+ val statement = conn.createStatement
+ try {
+ statement.setQueryTimeout(options.queryTimeout)
+ for (sql <- dialect.alterTable(tableName, changes)) {
+ statement.executeUpdate(sql)
+ }
Review comment:
What happens if one of the statements fails? Do we leave the table in
partially modified state? Should we perform all the statements atomically?
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -184,15 +189,56 @@ abstract class JdbcDialect extends Serializable {
/**
* Rename an existing table.
*
- * TODO (SPARK-32382): Override this method in the dialects that don't
support such syntax.
- *
* @param oldTable The existing table.
* @param newTable New name of the table.
* @return The SQL statement to use for renaming the table.
*/
def renameTable(oldTable: String, newTable: String): String = {
s"ALTER TABLE $oldTable RENAME TO $newTable"
}
+
+ /**
+ * Alter an existing table.
+ *
+ * @param tableName The name of the table to be altered.
+ * @param changes Changes to apply to the table.
+ * @return The SQL statement to use for altering the table.
+ */
+ def alterTable(tableName: String, changes: Seq[TableChange]): Array[String]
= {
+ val updateClause = mutable.ArrayBuilder.make[String]
+ for (change <- changes) {
+ change match {
+ case add: AddColumn =>
+ add.fieldNames match {
+ case Array(name) =>
+ val dataType = JdbcUtils.getJdbcType(add.dataType(),
this).databaseTypeDefinition
+ updateClause += s"ALTER TABLE $tableName ADD COLUMN $name
$dataType"
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported TableChange
fieldNames" +
+ s" ${add.fieldNames}")
+ }
+ case rename: RenameColumn =>
+ rename.fieldNames match {
+ case Array(name) =>
+ updateClause += s"ALTER TABLE $tableName RENAME COLUMN $name TO
${rename.newName}"
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported TableChange
fieldNames" +
+ s" ${rename.fieldNames}")
+ }
+ case delete: DeleteColumn =>
+ delete.fieldNames match {
+ case Array(name) =>
+ updateClause += s"ALTER TABLE $tableName DROP COLUMN $name"
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported TableChange
fieldNames" +
+ s" ${delete.fieldNames}")
+ }
+ case _ => throw new IllegalArgumentException(s"JDBC alterTable has
Unsupported" +
Review comment:
Could create sub-tasks for other changes like `UpdateColumnNullability`
and add TODO here if you are not going to implement them in this PR.
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -184,15 +189,56 @@ abstract class JdbcDialect extends Serializable {
/**
* Rename an existing table.
*
- * TODO (SPARK-32382): Override this method in the dialects that don't
support such syntax.
- *
* @param oldTable The existing table.
* @param newTable New name of the table.
* @return The SQL statement to use for renaming the table.
*/
def renameTable(oldTable: String, newTable: String): String = {
s"ALTER TABLE $oldTable RENAME TO $newTable"
}
+
+ /**
+ * Alter an existing table.
+ *
+ * @param tableName The name of the table to be altered.
+ * @param changes Changes to apply to the table.
+ * @return The SQL statement to use for altering the table.
+ */
+ def alterTable(tableName: String, changes: Seq[TableChange]): Array[String]
= {
+ val updateClause = mutable.ArrayBuilder.make[String]
+ for (change <- changes) {
+ change match {
+ case add: AddColumn =>
+ add.fieldNames match {
+ case Array(name) =>
+ val dataType = JdbcUtils.getJdbcType(add.dataType(),
this).databaseTypeDefinition
+ updateClause += s"ALTER TABLE $tableName ADD COLUMN $name
$dataType"
+ case _ =>
Review comment:
`add.fieldNames` always return `Array[String]`. It seems this default
case shouldn't happen.
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -184,15 +189,56 @@ abstract class JdbcDialect extends Serializable {
/**
* Rename an existing table.
*
- * TODO (SPARK-32382): Override this method in the dialects that don't
support such syntax.
- *
* @param oldTable The existing table.
* @param newTable New name of the table.
* @return The SQL statement to use for renaming the table.
*/
def renameTable(oldTable: String, newTable: String): String = {
s"ALTER TABLE $oldTable RENAME TO $newTable"
}
+
+ /**
+ * Alter an existing table.
+ *
+ * @param tableName The name of the table to be altered.
+ * @param changes Changes to apply to the table.
+ * @return The SQL statement to use for altering the table.
+ */
+ def alterTable(tableName: String, changes: Seq[TableChange]): Array[String]
= {
+ val updateClause = mutable.ArrayBuilder.make[String]
+ for (change <- changes) {
+ change match {
+ case add: AddColumn =>
+ add.fieldNames match {
+ case Array(name) =>
+ val dataType = JdbcUtils.getJdbcType(add.dataType(),
this).databaseTypeDefinition
+ updateClause += s"ALTER TABLE $tableName ADD COLUMN $name
$dataType"
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported TableChange
fieldNames" +
+ s" ${add.fieldNames}")
+ }
+ case rename: RenameColumn =>
+ rename.fieldNames match {
+ case Array(name) =>
+ updateClause += s"ALTER TABLE $tableName RENAME COLUMN $name TO
${rename.newName}"
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported TableChange
fieldNames" +
+ s" ${rename.fieldNames}")
+ }
+ case delete: DeleteColumn =>
+ delete.fieldNames match {
+ case Array(name) =>
+ updateClause += s"ALTER TABLE $tableName DROP COLUMN $name"
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported TableChange
fieldNames" +
+ s" ${delete.fieldNames}")
+ }
+ case _ => throw new IllegalArgumentException(s"JDBC alterTable has
Unsupported" +
+ s" TableChange ${change}")
Review comment:
```suggestion
s" TableChange $change")
```
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
##########
@@ -106,4 +106,58 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
Seq(Row("test", "people"), Row("test", "new_table")))
}
}
+
+ test("alter table ... add column") {
+ withTable("h2.test.alt_table") {
+ withConnection { conn =>
+ conn.prepareStatement("""CREATE TABLE "test"."alt_table" (id
INTEGER)""").executeUpdate()
+ }
+ assert(sql("DESCRIBE TABLE
h2.test.alt_table").select("col_name").take(1) === Seq(Row("ID")))
+ sql("ALTER TABLE h2.test.alt_table ADD COLUMNS (c1 INTEGER, c2 STRING)")
+ assert(sql("DESCRIBE TABLE
h2.test.alt_table").select("col_name").take(3) ===
+ Seq(Row("ID"), Row("C1"), Row("C2")))
Review comment:
Could add a helper function which will check column existence in a table.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]