gaborgsomogyi commented on a change in pull request #29024:
URL: https://github.com/apache/spark/pull/29024#discussion_r754476290
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala
##########
@@ -18,60 +18,45 @@
package org.apache.spark.sql.execution.datasources.jdbc.connection
import java.sql.{Connection, Driver}
-import java.util.Properties
+import java.util.ServiceLoader
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
-
-/**
- * Connection provider which opens connection toward various databases
(database specific instance
- * needed). If kerberos authentication required then it's the provider's
responsibility to set all
- * the parameters.
- */
-private[jdbc] trait ConnectionProvider {
- /**
- * Additional properties for data connection (Data source property takes
precedence).
- */
- def getAdditionalProperties(): Properties = new Properties()
+import scala.collection.mutable
- /**
- * Opens connection toward the database.
- */
- def getConnection(): Connection
-}
+import org.apache.spark.internal.Logging
+import org.apache.spark.security.SecurityConfigurationLock
+import org.apache.spark.sql.jdbc.JdbcConnectionProvider
+import org.apache.spark.util.Utils
private[jdbc] object ConnectionProvider extends Logging {
- def create(driver: Driver, options: JDBCOptions): ConnectionProvider = {
- if (options.keytab == null || options.principal == null) {
- logDebug("No authentication configuration found, using basic connection
provider")
- new BasicConnectionProvider(driver, options)
- } else {
- logDebug("Authentication configuration found, using database specific
connection provider")
- options.driverClass match {
- case PostgresConnectionProvider.driverClass =>
- logDebug("Postgres connection provider found")
- new PostgresConnectionProvider(driver, options)
-
- case MariaDBConnectionProvider.driverClass =>
- logDebug("MariaDB connection provider found")
- new MariaDBConnectionProvider(driver, options)
-
- case DB2ConnectionProvider.driverClass =>
- logDebug("DB2 connection provider found")
- new DB2ConnectionProvider(driver, options)
-
- case MSSQLConnectionProvider.driverClass =>
- logDebug("MS SQL connection provider found")
- new MSSQLConnectionProvider(driver, options)
-
- case OracleConnectionProvider.driverClass =>
- logDebug("Oracle connection provider found")
- new OracleConnectionProvider(driver, options)
-
- case _ =>
- throw new IllegalArgumentException(s"Driver ${options.driverClass}
does not support " +
- "Kerberos authentication")
+ private val providers = loadProviders()
+
+ def loadProviders(): Seq[JdbcConnectionProvider] = {
+ val loader = ServiceLoader.load(classOf[JdbcConnectionProvider],
+ Utils.getContextOrSparkClassLoader)
+ val providers = mutable.ArrayBuffer[JdbcConnectionProvider]()
+
+ val iterator = loader.iterator
+ while (iterator.hasNext) {
+ try {
+ val provider = iterator.next
+ logDebug(s"Loaded built in provider: $provider")
+ providers += provider
+ } catch {
+ case t: Throwable =>
+ logError(s"Failed to load built in provider.", t)
}
}
+ // Seems duplicate but it's needed for Scala 2.13
+ providers.toSeq
+ }
+
+ def create(driver: Driver, options: Map[String, String]): Connection = {
+ val filteredProviders = providers.filter(_.canHandle(driver, options))
+ require(filteredProviders.size == 1,
+ "JDBC connection initiated but not exactly one connection provider found
which can handle " +
+ s"it. Found active providers: ${filteredProviders.mkString(", ")}")
+ SecurityConfigurationLock.synchronized {
Review comment:
> is getConnection() not thread-safe?
Simply no.
This is well thought initially even if it introduces bottlenecks. The main
problem is that JVM has a single security context. In order to make a
connection one MUST modify the security context, otherwise the JDBC connector
is not able to provide the security credentials (TGT, keytab or whatever).
Simply saying JDBC connectors are able to get credentials from the single
security context.
Since multiple connections are made concurrently (sometimes same DB type w/
different credentials) this must be synchronized not to have a malformed
security context (we've made load test and added 10+ DBs and corrupted the
security context pretty fast w/o sync and it was horror to find out what's
going on).
Workload are coming and going, nobody can tell in advance what kind of
security context need to be created at the very beginning of JVM creation. If
we would pre-bake the security context (not sure how?!) then the following
issues would come:
* How to handle 2 same type of databases (for example MariaDB) with 2
different credentials?
* How to handle if a DB login credentials changed over time?
I personally think the first one can't be solved, the second one could be
cured w/ all JVM restarts but I think it's just not user friendly.
If somebody has an excellent idea I would like to hear it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]