cloud-fan commented on code in PR #53627:
URL: https://github.com/apache/spark/pull/53627#discussion_r2649403502
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##########
@@ -2153,122 +2153,122 @@ class SessionCatalog(
}
/**
- * Look up the `ExpressionInfo` of the given function by name if it's a
persistent function.
- * This supports both scalar and table functions.
+ * Look up a persistent function's ExpressionInfo by name (for DESCRIBE
FUNCTION).
+ * This only fetches metadata without loading resources or creating builders.
*/
def lookupPersistentFunction(name: FunctionIdentifier): ExpressionInfo = {
val qualifiedIdent = qualifyIdentifier(name)
- val db = qualifiedIdent.database.get
- val funcName = qualifiedIdent.funcName
+ // Check if already cached in either registry
functionRegistry.lookupFunction(qualifiedIdent)
.orElse(tableFunctionRegistry.lookupFunction(qualifiedIdent))
.getOrElse {
- requireDbExists(db)
- if (externalCatalog.functionExists(db, funcName)) {
- val metadata = externalCatalog.getFunction(db, funcName)
- if (metadata.isUserDefinedFunction) {
- UserDefinedFunction.fromCatalogFunction(metadata,
parser).toExpressionInfo
- } else {
- makeExprInfoForHiveFunction(metadata.copy(identifier =
qualifiedIdent))
- }
+ val funcMetadata = fetchCatalogFunction(qualifiedIdent)
+ if (funcMetadata.isUserDefinedFunction) {
+ UserDefinedFunction.fromCatalogFunction(funcMetadata,
parser).toExpressionInfo
} else {
- failFunctionLookup(name)
+ makeExprInfoForHiveFunction(funcMetadata)
}
}
}
/**
- * Look up a persistent scalar function by name and resolves it to an
Expression.
+ * Load a persistent scalar function by name.
+ * Returns V1Function with:
+ * - Eager info (from cache or catalog fetch, no resource loading)
+ * - Lazy builder (resource loading only on first invoke)
+ *
+ * This matches V1 behavior where DESCRIBE doesn't load resources.
*/
- def resolvePersistentFunction(
- name: FunctionIdentifier, arguments: Seq[Expression]): Expression = {
- resolvePersistentFunctionInternal[Expression](
- name,
- arguments,
- functionRegistry,
- registerHiveFunc = func =>
- registerFunction(
- func,
- overrideIfExists = false,
- registry = functionRegistry,
- functionBuilder = makeFunctionBuilder(func)
- ),
- registerUserDefinedFunc = function => {
- val builder = makeUserDefinedScalarFuncBuilder(function)
- registerUserDefinedFunction[Expression](
- function = function,
- overrideIfExists = false,
- registry = functionRegistry,
- functionBuilder = builder)
- }
- )
+ def loadPersistentScalarFunction(name: FunctionIdentifier): V1Function = {
+ val qualifiedIdent = qualifyIdentifier(name)
+
+ // Check cache first (no synchronization needed for reads)
+ functionRegistry.lookupFunctionEntry(qualifiedIdent) match {
+ case Some((cachedInfo, cachedBuilder)) =>
+ // Already cached - return with eager builder
+ V1Function(cachedInfo, cachedBuilder)
+
+ case None =>
+ // Fetch metadata eagerly (no resource loading yet)
+ val funcMetadata = fetchCatalogFunction(qualifiedIdent)
+ val info = if (funcMetadata.isUserDefinedFunction) {
+ UserDefinedFunction.fromCatalogFunction(funcMetadata,
parser).toExpressionInfo
+ } else {
+ makeExprInfoForHiveFunction(funcMetadata)
+ }
+
+ // Builder factory - loads resources only on first invoke()
+ val builderFactory: () => FunctionBuilder = () => synchronized {
+ // Re-check cache (another thread may have loaded it)
+ functionRegistry.lookupFunctionBuilder(qualifiedIdent).getOrElse {
+ loadFunctionResources(funcMetadata.resources)
+ if (funcMetadata.isUserDefinedFunction) {
+ val udf = UserDefinedFunction.fromCatalogFunction(funcMetadata,
parser)
+ registerUserDefinedFunction[Expression](
+ udf,
+ overrideIfExists = false,
+ functionRegistry,
+ makeUserDefinedScalarFuncBuilder(udf))
+ } else {
+ registerFunction(
+ funcMetadata,
+ overrideIfExists = false,
+ functionRegistry,
+ makeFunctionBuilder(funcMetadata))
+ }
+ functionRegistry.lookupFunctionBuilder(qualifiedIdent).get
+ }
+ }
+
+ V1Function(info, builderFactory)
+ }
}
/**
* Look up a persistent table function by name and resolves it to a
LogicalPlan.
*/
def resolvePersistentTableFunction(
name: FunctionIdentifier,
- arguments: Seq[Expression]): LogicalPlan = {
- resolvePersistentFunctionInternal[LogicalPlan](
- name,
- arguments,
- tableFunctionRegistry,
- // We don't support persistent Hive table functions yet.
- registerHiveFunc = (func: CatalogFunction) => failFunctionLookup(name),
- registerUserDefinedFunc = function => {
- val builder = makeUserDefinedTableFuncBuilder(function)
- registerUserDefinedFunction[LogicalPlan](
- function = function,
- overrideIfExists = false,
- registry = tableFunctionRegistry,
- functionBuilder = builder)
+ arguments: Seq[Expression]): LogicalPlan = synchronized {
+ val qualifiedIdent = qualifyIdentifier(name)
+ if (tableFunctionRegistry.functionExists(qualifiedIdent)) {
+ // Already cached
+ tableFunctionRegistry.lookupFunction(qualifiedIdent, arguments)
+ } else {
+ // Load from catalog
+ val funcMetadata = fetchCatalogFunction(qualifiedIdent)
+ if (!funcMetadata.isUserDefinedFunction) {
+ // Hive table functions are not supported
+ failFunctionLookup(qualifiedIdent)
}
- )
+ loadFunctionResources(funcMetadata.resources)
+ val udf = UserDefinedFunction.fromCatalogFunction(funcMetadata, parser)
+ registerUserDefinedFunction[LogicalPlan](
+ udf,
+ overrideIfExists = false,
+ tableFunctionRegistry,
+ makeUserDefinedTableFuncBuilder(udf))
+ tableFunctionRegistry.lookupFunction(qualifiedIdent, arguments)
+ }
}
- private def resolvePersistentFunctionInternal[T](
- name: FunctionIdentifier,
- arguments: Seq[Expression],
- registry: FunctionRegistryBase[T],
- registerHiveFunc: CatalogFunction => Unit,
- registerUserDefinedFunc: UserDefinedFunction => Unit): T = {
- // `synchronized` is used to prevent multiple threads from concurrently
resolving the
- // same function that has not yet been loaded into the function registry.
This is needed
- // because calling `registerFunction` twice with `overrideIfExists =
false` can lead to
- // a FunctionAlreadyExistsException.
- synchronized {
- val qualifiedIdent = qualifyIdentifier(name)
- val db = qualifiedIdent.database.get
- val funcName = qualifiedIdent.funcName
- if (registry.functionExists(qualifiedIdent)) {
- // This function has been already loaded into the function registry.
- registry.lookupFunction(qualifiedIdent, arguments)
Review Comment:
v2 function does not support table function yet, so scalar and table
functions has different code paths:
- scalar function looks up the cache at
https://github.com/apache/spark/pull/53627/files#diff-9dd0899e5406230aeff96654432da54f35255f6dc60eecb87264a5c508a8c826R2186
- table function looks up the cache at
https://github.com/apache/spark/pull/53627/files#diff-9dd0899e5406230aeff96654432da54f35255f6dc60eecb87264a5c508a8c826R2236
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]