cloud-fan commented on a change in pull request #35004:
URL: https://github.com/apache/spark/pull/35004#discussion_r779552817



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
##########
@@ -1540,130 +1547,181 @@ class SessionCatalog(
   }
 
   /**
-   * Look up the [[ExpressionInfo]] associated with the specified function, 
assuming it exists.
+   * Look up the `ExpressionInfo` of the given function by name if it's a 
built-in or temp function.
+   * This supports both scalar and table functions.
    */
-  def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = 
synchronized {
-    // TODO: just make function registry take in FunctionIdentifier instead of 
duplicating this
+  def lookupBuiltinOrTempFunction(name: String): Option[ExpressionInfo] = {
+    
FunctionRegistry.builtinOperators.get(name.toLowerCase(Locale.ROOT)).orElse {
+      def isBuiltin(ident: FunctionIdentifier): Boolean = {
+        FunctionRegistry.builtin.functionExists(ident) ||
+          TableFunctionRegistry.builtin.functionExists(ident)
+      }
+      def lookup(ident: FunctionIdentifier): Option[ExpressionInfo] = {
+        functionRegistry.lookupFunction(ident).orElse(
+          tableFunctionRegistry.lookupFunction(ident))
+      }
+      synchronized(lookupTempFuncWithViewContext(name, isBuiltin, lookup))
+    }
+  }
+
+  /**
+   * Looks up a built-in or temp scalar function by name and resolves it to an 
Expression if such
+   * a function exists.
+   */
+  def resolveBuiltinOrTempFunction(name: String, arguments: Seq[Expression]): 
Option[Expression] = {
+    resolveBuiltinOrTempFunctionInternal(
+      name, arguments, FunctionRegistry.builtin.functionExists, 
functionRegistry)
+  }
+
+  /**
+   * Looks up a built-in or temp table function by name and resolves it to a 
LogicalPlan if such
+   * a function exists.
+   */
+  def resolveBuiltinOrTempTableFunction(
+      name: String, arguments: Seq[Expression]): Option[LogicalPlan] = {
+    resolveBuiltinOrTempFunctionInternal(
+      name, arguments, TableFunctionRegistry.builtin.functionExists, 
tableFunctionRegistry)
+  }
+
+  private def resolveBuiltinOrTempFunctionInternal[T](
+      name: String,
+      arguments: Seq[Expression],
+      isBuiltin: FunctionIdentifier => Boolean,
+      registry: FunctionRegistryBase[T]): Option[T] = synchronized {
+    val funcIdent = FunctionIdentifier(name)
+    if (!registry.functionExists(funcIdent)) {
+      None
+    } else {
+      lookupTempFuncWithViewContext(
+        name, isBuiltin, ident => Option(registry.lookupFunction(ident, 
arguments)))
+    }
+  }
+
+  private def lookupTempFuncWithViewContext[T](
+      name: String,
+      isBuiltin: FunctionIdentifier => Boolean,
+      lookupFunc: FunctionIdentifier => Option[T]): Option[T] = {
+    val funcIdent = FunctionIdentifier(name)
+    if (isBuiltin(funcIdent)) {
+      lookupFunc(funcIdent)
+    } else {
+      val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
+      val referredTempFunctionNames = 
AnalysisContext.get.referredTempFunctionNames
+      if (isResolvingView) {
+        // When resolving a view, only return a temp function if it's referred 
by this view.
+        if (referredTempFunctionNames.contains(name)) {
+          lookupFunc(funcIdent)
+        } else {
+          None
+        }
+      } else {
+        val result = lookupFunc(funcIdent)
+        if (result.isDefined) {
+          // We are not resolving a view and the function is a temp one, add 
it to
+          // `AnalysisContext`, so during the view creation, we can save all 
referred temp
+          // functions to view metadata.
+          AnalysisContext.get.referredTempFunctionNames.add(name)
+        }
+        result
+      }
+    }
+  }
+
+  /**
+   * Look up the `ExpressionInfo` of the given function by name if it's a 
persistent function.
+   * This supports both scalar and table functions.
+   */
+  def lookupPersistentFunction(name: FunctionIdentifier): ExpressionInfo = {
     val database = 
name.database.orElse(Some(currentDb)).map(formatDatabaseName)
     val qualifiedName = name.copy(database = database)
-    functionRegistry.lookupFunction(name)
-      .orElse(functionRegistry.lookupFunction(qualifiedName))
-      .orElse(tableFunctionRegistry.lookupFunction(name))
+    functionRegistry.lookupFunction(qualifiedName)
+      .orElse(tableFunctionRegistry.lookupFunction(qualifiedName))
       .getOrElse {
         val db = qualifiedName.database.get
         requireDbExists(db)
         if (externalCatalog.functionExists(db, name.funcName)) {
           val metadata = externalCatalog.getFunction(db, name.funcName)
-          new ExpressionInfo(
-            metadata.className,
-            qualifiedName.database.orNull,
-            qualifiedName.identifier,
-            null,
-            "",
-            "",
-            "",
-            "",
-            "",
-            "",
-            "hive")
+          makeExprInfoForHiveFunction(metadata.copy(identifier = 
qualifiedName))
         } else {
           failFunctionLookup(name)
         }
       }
   }
 
   /**
-   * Look up a specific function, assuming it exists.
-   *
-   * For a temporary function or a permanent function that has been loaded,
-   * this method will simply lookup the function through the
-   * FunctionRegistry and create an expression based on the builder.
-   *
-   * For a permanent function that has not been loaded, we will first fetch 
its metadata
-   * from the underlying external catalog. Then, we will load all resources 
associated
-   * with this function (i.e. jars and files). Finally, we create a function 
builder
-   * based on the function class and put the builder into the FunctionRegistry.
-   * The name of this function in the FunctionRegistry will be 
`databaseName.functionName`.
+   * Looks up a persistent scalar function by name and resolves it to an 
Expression.
    */
-  private def lookupFunction[T](
-      name: FunctionIdentifier,
-      children: Seq[Expression],
-      registry: FunctionRegistryBase[T]): T = synchronized {
-    import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
-
-    // Note: the implementation of this function is a little bit convoluted.
-    // We probably shouldn't use a single FunctionRegistry to register all 
three kinds of functions
-    // (built-in, temp, and external).
-    if (name.database.isEmpty && registry.functionExists(name)) {
-      val referredTempFunctionNames = 
AnalysisContext.get.referredTempFunctionNames
-      val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
-      // Lookup the function as a temporary or a built-in function (i.e. 
without database) and
-      // 1. if we are not resolving view, we don't care about the function 
type and just return it.
-      // 2. if we are resolving view, only return a temp function if it's 
referred by this view.
-      if (!isResolvingView ||
-          !isTemporaryFunction(name) ||
-          referredTempFunctionNames.contains(name.funcName)) {
-        // We are not resolving a view and the function is a temp one, add it 
to `AnalysisContext`,
-        // so during the view creation, we can save all referred temp 
functions to view metadata
-        if (!isResolvingView && isTemporaryFunction(name)) {
-          AnalysisContext.get.referredTempFunctionNames.add(name.funcName)
-        }
-        // This function has been already loaded into the function registry.
-        return registry.lookupFunction(name, children)
-      }
-    }
+  def resolvePersistentFunction(
+      name: FunctionIdentifier, arguments: Seq[Expression]): Expression = {
+    resolvePersistentFunctionInternal(name, arguments, functionRegistry, 
makeFunctionBuilder)
+  }
 
-    // Get the database from AnalysisContext if it's defined, otherwise, use 
current database
-    val currentDatabase = AnalysisContext.get.catalogAndNamespace match {
-      case Seq() => getCurrentDatabase
-      case Seq(_, db) => db
-      case Seq(catalog, namespace @ _*) =>
-        throw new IllegalStateException(s"[BUG] unexpected v2 catalog: 
$catalog, and " +
-          s"namespace: ${namespace.quoted} in v1 function lookup")
-    }
+  /**
+   * Looks up a persistent table function by name and resolves it to a 
LogicalPlan.
+   */
+  def resolvePersistentTableFunction(
+      name: FunctionIdentifier,
+      arguments: Seq[Expression]): LogicalPlan = {
+    // We don't support persistent table functions yet.
+    val builder = (func: CatalogFunction) => failFunctionLookup(name)
+    resolvePersistentFunctionInternal(name, arguments, tableFunctionRegistry, 
builder)
+  }
 
-    // If the name itself is not qualified, add the current database to it.
-    val database = formatDatabaseName(name.database.getOrElse(currentDatabase))
+  private def resolvePersistentFunctionInternal[T](
+      name: FunctionIdentifier,
+      arguments: Seq[Expression],
+      registry: FunctionRegistryBase[T],
+      createFunctionBuilder: CatalogFunction => 
FunctionRegistryBase[T]#FunctionBuilder): T = {
+    val database = formatDatabaseName(name.database.getOrElse(currentDb))
     val qualifiedName = name.copy(database = Some(database))
-
     if (registry.functionExists(qualifiedName)) {
       // This function has been already loaded into the function registry.
-      // Unlike the above block, we find this function by using the qualified 
name.
-      return registry.lookupFunction(qualifiedName, children)
-    }
-
-    // The function has not been loaded to the function registry, which means
-    // that the function is a permanent function (if it actually has been 
registered
-    // in the metastore). We need to first put the function in the 
FunctionRegistry.
-    // TODO: why not just check whether the function exists first?
-    val catalogFunction = try {
-      externalCatalog.getFunction(database, name.funcName)
-    } catch {
-      case _: AnalysisException => failFunctionLookup(name)
+      registry.lookupFunction(qualifiedName, arguments)
+    } else {
+      // The function has not been loaded to the function registry, which means
+      // that the function is a persistent function (if it actually has been 
registered
+      // in the metastore). We need to first put the function in the function 
registry.
+      val catalogFunction = try {
+        externalCatalog.getFunction(database, qualifiedName.funcName)
+      } catch {
+        case _: AnalysisException => failFunctionLookup(qualifiedName)
+      }
+      loadFunctionResources(catalogFunction.resources)
+      // Please note that qualifiedName is provided by the user. However,
+      // catalogFunction.identifier.unquotedString is returned by the 
underlying
+      // catalog. So, it is possible that qualifiedName is not exactly the 
same as
+      // catalogFunction.identifier.unquotedString (difference is on 
case-sensitivity).
+      // At here, we preserve the input from the user.
+      val funcMetadata = catalogFunction.copy(identifier = qualifiedName)
+      registerFunction(
+        funcMetadata,
+        overrideIfExists = false,
+        registry = registry,
+        functionBuilder = createFunctionBuilder(funcMetadata))
+      // Now, we need to create the Expression.
+      registry.lookupFunction(qualifiedName, arguments)
     }
-    loadFunctionResources(catalogFunction.resources)
-    // Please note that qualifiedName is provided by the user. However,
-    // catalogFunction.identifier.unquotedString is returned by the underlying
-    // catalog. So, it is possible that qualifiedName is not exactly the same 
as
-    // catalogFunction.identifier.unquotedString (difference is on 
case-sensitivity).
-    // At here, we preserve the input from the user.
-    registerFunction(catalogFunction.copy(identifier = qualifiedName), 
overrideIfExists = false)
-    // Now, we need to create the Expression.
-    registry.lookupFunction(qualifiedName, children)
   }
 
   /**
-   * Return an [[Expression]] that represents the specified function, assuming 
it exists.
+   * Look up the [[ExpressionInfo]] associated with the specified function, 
assuming it exists.
    */
-  def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): 
Expression = {
-    lookupFunction[Expression](name, children, functionRegistry)
+  def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = 
synchronized {
+    if (name.database.isEmpty) {
+      
lookupBuiltinOrTempFunction(name.funcName).getOrElse(lookupPersistentFunction(name))
+    } else {
+      lookupPersistentFunction(name)
+    }
   }
 
-  /**
-   * Return a [[LogicalPlan]] that represents the specified function, assuming 
it exists.
-   */
-  def lookupTableFunction(name: FunctionIdentifier, children: 
Seq[Expression]): LogicalPlan = {
-    lookupFunction[LogicalPlan](name, children, tableFunctionRegistry)
+  // Test only

Review comment:
       Because the caller side always looks up temp/built-in function first, 
then persistent function. The persistent function lookup part may need to look 
up v2 catalogs, so this `lookupFunction` is no longer correct.
   
   We can bring back `lookupTableFunction` though, as v2 catalog doesn't 
support it yet.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to