This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4828f49ce15a [SPARK-46670][PYTHON][SQL] Make DataSourceManager self clone-able by separating static and runtime Python Data Sources 4828f49ce15a is described below commit 4828f49ce15a45e64c01a389ee806b726df5cdab Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Fri Jan 12 11:24:42 2024 +0900 [SPARK-46670][PYTHON][SQL] Make DataSourceManager self clone-able by separating static and runtime Python Data Sources ### What changes were proposed in this pull request? This PR proposes to make DataSourceManager isolated and self clone-able without actual lookup by separating separating static and runtime Python Data Sources. ### Why are the changes needed? For better maintenance of the code. Now, we triggers Python execution that actually initializes `SparkSession` via `SQLConf`. There are too many side effects. Also, we should separate static and runtime Python Data Sources in any event. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unittest was added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44681 from HyukjinKwon/SPARK-46670. Lead-authored-by: Hyukjin Kwon <gurwls...@apache.org> Co-authored-by: Hyukjin Kwon <gurwls...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../execution/datasources/DataSourceManager.scala | 34 ++++++++++++++-------- .../datasources/DataSourceManagerSuite.scala | 32 ++++++++++++++++++++ 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala index 92e602128988..53003989a338 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala @@ -21,8 +21,6 @@ import java.io.File import java.util.Locale import java.util.concurrent.ConcurrentHashMap -import scala.jdk.CollectionConverters._ - import org.apache.spark.api.python.PythonUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.errors.QueryCompilationErrors @@ -34,16 +32,18 @@ import org.apache.spark.util.Utils * A manager for user-defined data sources. It is used to register and lookup data sources by * their short names or fully qualified names. */ -class DataSourceManager extends Logging { +class DataSourceManager( + initDataSourceBuilders: => Option[ + Map[String, UserDefinedPythonDataSource]] = None + ) extends Logging { + import DataSourceManager._ // Lazy to avoid being invoked during Session initialization. // Otherwise, it goes infinite loop, session -> Python runner -> SQLConf -> session. - private lazy val dataSourceBuilders = { - val builders = new ConcurrentHashMap[String, UserDefinedPythonDataSource]() - builders.putAll(DataSourceManager.initialDataSourceBuilders.asJava) - builders + private lazy val staticDataSourceBuilders = initDataSourceBuilders.getOrElse { + initialDataSourceBuilders } - private def normalize(name: String): String = name.toLowerCase(Locale.ROOT) + private val dataSourceBuilders = new ConcurrentHashMap[String, UserDefinedPythonDataSource]() /** * Register a data source builder for the given provider. @@ -51,6 +51,10 @@ class DataSourceManager extends Logging { */ def registerDataSource(name: String, source: UserDefinedPythonDataSource): Unit = { val normalizedName = normalize(name) + if (staticDataSourceBuilders.contains(normalizedName)) { + // Cannot overwrite static Python Data Sources. + throw QueryCompilationErrors.dataSourceAlreadyExists(name) + } val previousValue = dataSourceBuilders.put(normalizedName, source) if (previousValue != null) { logWarning(f"The data source $name replaced a previously registered data source.") @@ -63,7 +67,9 @@ class DataSourceManager extends Logging { */ def lookupDataSource(name: String): UserDefinedPythonDataSource = { if (dataSourceExists(name)) { - dataSourceBuilders.get(normalize(name)) + val normalizedName = normalize(name) + staticDataSourceBuilders.getOrElse( + normalizedName, dataSourceBuilders.get(normalizedName)) } else { throw QueryCompilationErrors.dataSourceDoesNotExist(name) } @@ -73,11 +79,13 @@ class DataSourceManager extends Logging { * Checks if a data source with the specified name exists (case-insensitive). */ def dataSourceExists(name: String): Boolean = { - dataSourceBuilders.containsKey(normalize(name)) + val normalizedName = normalize(name) + staticDataSourceBuilders.contains(normalizedName) || + dataSourceBuilders.containsKey(normalizedName) } override def clone(): DataSourceManager = { - val manager = new DataSourceManager + val manager = new DataSourceManager(Some(staticDataSourceBuilders)) dataSourceBuilders.forEach((k, v) => manager.registerDataSource(k, v)) manager } @@ -93,6 +101,8 @@ object DataSourceManager extends Logging { PythonUtils.sparkPythonPaths.forall(new File(_).exists()) } + private def normalize(name: String): String = name.toLowerCase(Locale.ROOT) + private def initialDataSourceBuilders: Map[String, UserDefinedPythonDataSource] = { if (Utils.isTesting || shouldLoadPythonDataSources) this.synchronized { if (dataSourceBuilders.isEmpty) { @@ -109,7 +119,7 @@ object DataSourceManager extends Logging { dataSourceBuilders = maybeResult.map { result => result.names.zip(result.dataSources).map { case (name, dataSource) => - name -> + normalize(name) -> UserDefinedPythonDataSource(PythonUtils.createPythonFunction(dataSource)) }.toMap } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceManagerSuite.scala new file mode 100644 index 000000000000..af9775f44f94 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceManagerSuite.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.SparkFunSuite + +class DataSourceManagerSuite extends SparkFunSuite { + test("SPARK-46670: DataSourceManager should be self clone-able without lookup") { + val testAppender = new LogAppender("Cloneable DataSourceManager without lookup") + withLogAppender(testAppender) { + new DataSourceManager().clone() + } + assert(!testAppender.loggingEvents + .exists(msg => + msg.getMessage.getFormattedMessage.contains("Skipping the lookup of Python Data Sources"))) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org