This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4828f49ce15a [SPARK-46670][PYTHON][SQL] Make DataSourceManager self 
clone-able by separating static and runtime Python Data Sources
4828f49ce15a is described below

commit 4828f49ce15a45e64c01a389ee806b726df5cdab
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Fri Jan 12 11:24:42 2024 +0900

    [SPARK-46670][PYTHON][SQL] Make DataSourceManager self clone-able by 
separating static and runtime Python Data Sources
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to make DataSourceManager isolated and self clone-able 
without actual lookup by separating separating static and runtime Python Data 
Sources.
    
    ### Why are the changes needed?
    
    For better maintenance of the code. Now, we triggers Python execution that 
actually initializes `SparkSession` via `SQLConf`. There are too many side 
effects.
    
    Also, we should separate static and runtime Python Data Sources in any 
event.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unittest was added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44681 from HyukjinKwon/SPARK-46670.
    
    Lead-authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Co-authored-by: Hyukjin Kwon <gurwls...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../execution/datasources/DataSourceManager.scala  | 34 ++++++++++++++--------
 .../datasources/DataSourceManagerSuite.scala       | 32 ++++++++++++++++++++
 2 files changed, 54 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
index 92e602128988..53003989a338 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
@@ -21,8 +21,6 @@ import java.io.File
 import java.util.Locale
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.jdk.CollectionConverters._
-
 import org.apache.spark.api.python.PythonUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -34,16 +32,18 @@ import org.apache.spark.util.Utils
  * A manager for user-defined data sources. It is used to register and lookup 
data sources by
  * their short names or fully qualified names.
  */
-class DataSourceManager extends Logging {
+class DataSourceManager(
+    initDataSourceBuilders: => Option[
+      Map[String, UserDefinedPythonDataSource]] = None
+   ) extends Logging {
+  import DataSourceManager._
   // Lazy to avoid being invoked during Session initialization.
   // Otherwise, it goes infinite loop, session -> Python runner -> SQLConf -> 
session.
-  private lazy val dataSourceBuilders = {
-    val builders = new ConcurrentHashMap[String, UserDefinedPythonDataSource]()
-    builders.putAll(DataSourceManager.initialDataSourceBuilders.asJava)
-    builders
+  private lazy val staticDataSourceBuilders = initDataSourceBuilders.getOrElse 
{
+    initialDataSourceBuilders
   }
 
-  private def normalize(name: String): String = name.toLowerCase(Locale.ROOT)
+  private val dataSourceBuilders = new ConcurrentHashMap[String, 
UserDefinedPythonDataSource]()
 
   /**
    * Register a data source builder for the given provider.
@@ -51,6 +51,10 @@ class DataSourceManager extends Logging {
    */
   def registerDataSource(name: String, source: UserDefinedPythonDataSource): 
Unit = {
     val normalizedName = normalize(name)
+    if (staticDataSourceBuilders.contains(normalizedName)) {
+      // Cannot overwrite static Python Data Sources.
+      throw QueryCompilationErrors.dataSourceAlreadyExists(name)
+    }
     val previousValue = dataSourceBuilders.put(normalizedName, source)
     if (previousValue != null) {
       logWarning(f"The data source $name replaced a previously registered data 
source.")
@@ -63,7 +67,9 @@ class DataSourceManager extends Logging {
    */
   def lookupDataSource(name: String): UserDefinedPythonDataSource = {
     if (dataSourceExists(name)) {
-      dataSourceBuilders.get(normalize(name))
+      val normalizedName = normalize(name)
+      staticDataSourceBuilders.getOrElse(
+        normalizedName, dataSourceBuilders.get(normalizedName))
     } else {
       throw QueryCompilationErrors.dataSourceDoesNotExist(name)
     }
@@ -73,11 +79,13 @@ class DataSourceManager extends Logging {
    * Checks if a data source with the specified name exists (case-insensitive).
    */
   def dataSourceExists(name: String): Boolean = {
-    dataSourceBuilders.containsKey(normalize(name))
+    val normalizedName = normalize(name)
+    staticDataSourceBuilders.contains(normalizedName) ||
+      dataSourceBuilders.containsKey(normalizedName)
   }
 
   override def clone(): DataSourceManager = {
-    val manager = new DataSourceManager
+    val manager = new DataSourceManager(Some(staticDataSourceBuilders))
     dataSourceBuilders.forEach((k, v) => manager.registerDataSource(k, v))
     manager
   }
@@ -93,6 +101,8 @@ object DataSourceManager extends Logging {
       PythonUtils.sparkPythonPaths.forall(new File(_).exists())
   }
 
+  private def normalize(name: String): String = name.toLowerCase(Locale.ROOT)
+
   private def initialDataSourceBuilders: Map[String, 
UserDefinedPythonDataSource] = {
     if (Utils.isTesting || shouldLoadPythonDataSources) this.synchronized {
       if (dataSourceBuilders.isEmpty) {
@@ -109,7 +119,7 @@ object DataSourceManager extends Logging {
 
         dataSourceBuilders = maybeResult.map { result =>
           result.names.zip(result.dataSources).map { case (name, dataSource) =>
-            name ->
+            normalize(name) ->
               
UserDefinedPythonDataSource(PythonUtils.createPythonFunction(dataSource))
           }.toMap
         }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceManagerSuite.scala
new file mode 100644
index 000000000000..af9775f44f94
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceManagerSuite.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.SparkFunSuite
+
+class DataSourceManagerSuite extends SparkFunSuite {
+  test("SPARK-46670: DataSourceManager should be self clone-able without 
lookup") {
+    val testAppender = new LogAppender("Cloneable DataSourceManager without 
lookup")
+    withLogAppender(testAppender) {
+      new DataSourceManager().clone()
+    }
+    assert(!testAppender.loggingEvents
+      .exists(msg =>
+        msg.getMessage.getFormattedMessage.contains("Skipping the lookup of 
Python Data Sources")))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to