Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/15292#discussion_r82537727
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
---
@@ -17,47 +17,132 @@
package org.apache.spark.sql.execution.datasources.jdbc
+import java.sql.{Connection, DriverManager}
+import java.util.Properties
+
/**
* Options for the JDBC data source.
*/
class JDBCOptions(
@transient private val parameters: Map[String, String])
extends Serializable {
+ import JDBCOptions._
+
+ def this(url: String, table: String, parameters: Map[String, String]) = {
+ this(parameters ++ Map(
+ JDBCOptions.JDBC_URL -> url,
+ JDBCOptions.JDBC_TABLE_NAME -> table))
+ }
+
+ val asConnectionProperties: Properties = {
+ val properties = new Properties()
+ // We should avoid to pass the options into properties. See
SPARK-17776.
+ parameters.filterKeys(!jdbcOptionNames.contains(_))
+ .foreach { case (k, v) => properties.setProperty(k, v) }
+ properties
+ }
+
// ------------------------------------------------------------
// Required parameters
// ------------------------------------------------------------
- require(parameters.isDefinedAt("url"), "Option 'url' is required.")
- require(parameters.isDefinedAt("dbtable"), "Option 'dbtable' is
required.")
+ require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is
required.")
+ require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option
'$JDBC_TABLE_NAME' is required.")
// a JDBC URL
- val url = parameters("url")
+ val url = parameters(JDBC_URL)
// name of table
- val table = parameters("dbtable")
+ val table = parameters(JDBC_TABLE_NAME)
+
+ // ------------------------------------------------------------
+ // Optional parameters
+ // ------------------------------------------------------------
+ val driverClass = {
+ val userSpecifiedDriverClass = parameters.get(JDBC_DRIVER_CLASS)
+ userSpecifiedDriverClass.foreach(DriverRegistry.register)
+
+ // Performing this part of the logic on the driver guards against the
corner-case where the
+ // driver returned for a URL is different on the driver and executors
due to classpath
+ // differences.
+ userSpecifiedDriverClass.getOrElse {
+ DriverManager.getDriver(url).getClass.getCanonicalName
+ }
+ }
// ------------------------------------------------------------
- // Optional parameter list
+ // Optional parameters only for reading
// ------------------------------------------------------------
// the column used to partition
- val partitionColumn = parameters.getOrElse("partitionColumn", null)
+ val partitionColumn = parameters.getOrElse(JDBC_PARTITION_COLUMN, null)
// the lower bound of partition column
- val lowerBound = parameters.getOrElse("lowerBound", null)
+ val lowerBound = parameters.getOrElse(JDBC_LOWER_BOUND, null)
// the upper bound of the partition column
- val upperBound = parameters.getOrElse("upperBound", null)
+ val upperBound = parameters.getOrElse(JDBC_UPPER_BOUND, null)
// the number of partitions
- val numPartitions = parameters.getOrElse("numPartitions", null)
-
+ val numPartitions = parameters.getOrElse(JDBC_NUM_PARTITIONS, null)
require(partitionColumn == null ||
(lowerBound != null && upperBound != null && numPartitions != null),
- "If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," +
- " and 'numPartitions' are required.")
+ s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND',
'$JDBC_UPPER_BOUND'," +
+ s" and '$JDBC_NUM_PARTITIONS' are required.")
+ val fetchSize = {
+ val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
+ require(size >= 0,
+ s"Invalid value `${size.toString}` for parameter " +
+ s"`$JDBC_BATCH_FETCH_SIZE`. The minimum value is 0. When the value
is 0, " +
+ "the JDBC driver ignores the value and does the estimates.")
+ size
+ }
// ------------------------------------------------------------
- // The options for DataFrameWriter
+ // Optional parameters only for writing
// ------------------------------------------------------------
// if to truncate the table from the JDBC database
- val isTruncate = parameters.getOrElse("truncate", "false").toBoolean
+ val isTruncate = parameters.getOrElse(JDBC_TRUNCATE, "false").toBoolean
// the create table option , which can be table_options or
partition_options.
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT
CHARSET=utf8"
// TODO: to reuse the existing partition parameters for those partition
specific options
- val createTableOptions = parameters.getOrElse("createTableOptions", "")
+ val createTableOptions = parameters.getOrElse(JDBC_CREATE_TABLE_OPTIONS,
"")
+ val batchSize = {
+ val size = parameters.getOrElse(JDBC_BATCH_INSERT_SIZE, "1000").toInt
+ require(size >= 1,
+ s"Invalid value `${size.toString}` for parameter " +
+ s"`$JDBC_BATCH_INSERT_SIZE`. The minimum value is 1.")
+ size
+ }
+ val isolationLevel =
+ parameters.getOrElse(JDBC_TXN_ISOLATION_LEVEL, "READ_UNCOMMITTED")
match {
+ case "NONE" => Connection.TRANSACTION_NONE
+ case "READ_UNCOMMITTED" => Connection.TRANSACTION_READ_UNCOMMITTED
+ case "READ_COMMITTED" => Connection.TRANSACTION_READ_COMMITTED
+ case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
+ case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
+ }
+}
+
+object JDBCOptions {
+ val JDBC_URL = "url"
+ val JDBC_TABLE_NAME = "dbtable"
+ val JDBC_DRIVER_CLASS = "driver"
+ val JDBC_PARTITION_COLUMN = "partitionColumn"
+ val JDBC_LOWER_BOUND = "lowerBound"
+ val JDBC_UPPER_BOUND = "upperBound"
+ val JDBC_NUM_PARTITIONS = "numPartitions"
+ val JDBC_BATCH_FETCH_SIZE = "fetchsize"
+ val JDBC_TRUNCATE = "truncate"
+ val JDBC_CREATE_TABLE_OPTIONS = "createTableOptions"
+ val JDBC_BATCH_INSERT_SIZE = "batchsize"
+ val JDBC_TXN_ISOLATION_LEVEL = "isolationLevel"
+
+ private val jdbcOptionNames = Seq(
--- End diff --
@cloud-fan Are you saying to use `getDeclaredFields`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]