allisonwang-db commented on a change in pull request #35004:
URL: https://github.com/apache/spark/pull/35004#discussion_r779322815
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
##########
@@ -1540,130 +1547,181 @@ class SessionCatalog(
}
/**
- * Look up the [[ExpressionInfo]] associated with the specified function,
assuming it exists.
+ * Look up the `ExpressionInfo` of the given function by name if it's a
built-in or temp function.
+ * This supports both scalar and table functions.
*/
- def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo =
synchronized {
- // TODO: just make function registry take in FunctionIdentifier instead of
duplicating this
+ def lookupBuiltinOrTempFunction(name: String): Option[ExpressionInfo] = {
+
FunctionRegistry.builtinOperators.get(name.toLowerCase(Locale.ROOT)).orElse {
+ def isBuiltin(ident: FunctionIdentifier): Boolean = {
+ FunctionRegistry.builtin.functionExists(ident) ||
+ TableFunctionRegistry.builtin.functionExists(ident)
+ }
+ def lookup(ident: FunctionIdentifier): Option[ExpressionInfo] = {
+ functionRegistry.lookupFunction(ident).orElse(
+ tableFunctionRegistry.lookupFunction(ident))
+ }
+ synchronized(lookupTempFuncWithViewContext(name, isBuiltin, lookup))
+ }
+ }
+
+ /**
+ * Looks up a built-in or temp scalar function by name and resolves it to an
Expression if such
Review comment:
nit: `Looks` -> `Look`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
##########
@@ -1540,130 +1547,181 @@ class SessionCatalog(
}
/**
- * Look up the [[ExpressionInfo]] associated with the specified function,
assuming it exists.
+ * Look up the `ExpressionInfo` of the given function by name if it's a
built-in or temp function.
+ * This supports both scalar and table functions.
*/
- def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo =
synchronized {
- // TODO: just make function registry take in FunctionIdentifier instead of
duplicating this
+ def lookupBuiltinOrTempFunction(name: String): Option[ExpressionInfo] = {
+
FunctionRegistry.builtinOperators.get(name.toLowerCase(Locale.ROOT)).orElse {
+ def isBuiltin(ident: FunctionIdentifier): Boolean = {
Review comment:
We can add this method `isBuiltinFunction` in SessionCatalog and use it
to refactor `isTemporaryFunction`.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
##########
@@ -190,15 +196,23 @@ case class ResolvedView(identifier: Identifier, isTemp:
Boolean) extends LeafNod
}
/**
- * A plan containing resolved function.
+ * A plan containing resolved persistent function.
*/
-// TODO: create a generic representation for v1, v2 function, after we add
function
-// support to v2 catalog. For now we only need the identifier to
fallback to v1 command.
-case class ResolvedFunc(identifier: Identifier)
+case class ResolvedPersistentFunc(
+ catalog: FunctionCatalog,
+ identifier: Identifier,
+ func: UnboundFunction)
extends LeafNode {
override def output: Seq[Attribute] = Nil
}
+/**
+ * A plan containing resolved non-persistent (temp or built-in) function.
+ */
+case class ResolvedNonPersistentFunc(name: String, func: UnboundFunction)
extends LeafNode {
Review comment:
Temp or built-in function can only be V1Function?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
##########
@@ -1540,130 +1547,181 @@ class SessionCatalog(
}
/**
- * Look up the [[ExpressionInfo]] associated with the specified function,
assuming it exists.
+ * Look up the `ExpressionInfo` of the given function by name if it's a
built-in or temp function.
+ * This supports both scalar and table functions.
*/
- def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo =
synchronized {
- // TODO: just make function registry take in FunctionIdentifier instead of
duplicating this
+ def lookupBuiltinOrTempFunction(name: String): Option[ExpressionInfo] = {
+
FunctionRegistry.builtinOperators.get(name.toLowerCase(Locale.ROOT)).orElse {
+ def isBuiltin(ident: FunctionIdentifier): Boolean = {
+ FunctionRegistry.builtin.functionExists(ident) ||
+ TableFunctionRegistry.builtin.functionExists(ident)
+ }
+ def lookup(ident: FunctionIdentifier): Option[ExpressionInfo] = {
+ functionRegistry.lookupFunction(ident).orElse(
+ tableFunctionRegistry.lookupFunction(ident))
+ }
+ synchronized(lookupTempFuncWithViewContext(name, isBuiltin, lookup))
+ }
+ }
+
+ /**
+ * Looks up a built-in or temp scalar function by name and resolves it to an
Expression if such
+ * a function exists.
+ */
+ def resolveBuiltinOrTempFunction(name: String, arguments: Seq[Expression]):
Option[Expression] = {
+ resolveBuiltinOrTempFunctionInternal(
+ name, arguments, FunctionRegistry.builtin.functionExists,
functionRegistry)
+ }
+
+ /**
+ * Looks up a built-in or temp table function by name and resolves it to a
LogicalPlan if such
+ * a function exists.
+ */
+ def resolveBuiltinOrTempTableFunction(
+ name: String, arguments: Seq[Expression]): Option[LogicalPlan] = {
+ resolveBuiltinOrTempFunctionInternal(
+ name, arguments, TableFunctionRegistry.builtin.functionExists,
tableFunctionRegistry)
+ }
+
+ private def resolveBuiltinOrTempFunctionInternal[T](
+ name: String,
+ arguments: Seq[Expression],
+ isBuiltin: FunctionIdentifier => Boolean,
+ registry: FunctionRegistryBase[T]): Option[T] = synchronized {
+ val funcIdent = FunctionIdentifier(name)
+ if (!registry.functionExists(funcIdent)) {
+ None
+ } else {
+ lookupTempFuncWithViewContext(
+ name, isBuiltin, ident => Option(registry.lookupFunction(ident,
arguments)))
+ }
+ }
+
+ private def lookupTempFuncWithViewContext[T](
+ name: String,
+ isBuiltin: FunctionIdentifier => Boolean,
+ lookupFunc: FunctionIdentifier => Option[T]): Option[T] = {
+ val funcIdent = FunctionIdentifier(name)
+ if (isBuiltin(funcIdent)) {
+ lookupFunc(funcIdent)
+ } else {
+ val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
+ val referredTempFunctionNames =
AnalysisContext.get.referredTempFunctionNames
+ if (isResolvingView) {
+ // When resolving a view, only return a temp function if it's referred
by this view.
+ if (referredTempFunctionNames.contains(name)) {
+ lookupFunc(funcIdent)
+ } else {
+ None
+ }
+ } else {
+ val result = lookupFunc(funcIdent)
+ if (result.isDefined) {
+ // We are not resolving a view and the function is a temp one, add
it to
+ // `AnalysisContext`, so during the view creation, we can save all
referred temp
+ // functions to view metadata.
+ AnalysisContext.get.referredTempFunctionNames.add(name)
+ }
+ result
+ }
+ }
+ }
+
+ /**
+ * Look up the `ExpressionInfo` of the given function by name if it's a
persistent function.
+ * This supports both scalar and table functions.
+ */
+ def lookupPersistentFunction(name: FunctionIdentifier): ExpressionInfo = {
val database =
name.database.orElse(Some(currentDb)).map(formatDatabaseName)
val qualifiedName = name.copy(database = database)
- functionRegistry.lookupFunction(name)
- .orElse(functionRegistry.lookupFunction(qualifiedName))
- .orElse(tableFunctionRegistry.lookupFunction(name))
+ functionRegistry.lookupFunction(qualifiedName)
+ .orElse(tableFunctionRegistry.lookupFunction(qualifiedName))
.getOrElse {
val db = qualifiedName.database.get
requireDbExists(db)
if (externalCatalog.functionExists(db, name.funcName)) {
val metadata = externalCatalog.getFunction(db, name.funcName)
- new ExpressionInfo(
- metadata.className,
- qualifiedName.database.orNull,
- qualifiedName.identifier,
- null,
- "",
- "",
- "",
- "",
- "",
- "",
- "hive")
+ makeExprInfoForHiveFunction(metadata.copy(identifier =
qualifiedName))
} else {
failFunctionLookup(name)
}
}
}
/**
- * Look up a specific function, assuming it exists.
- *
- * For a temporary function or a permanent function that has been loaded,
- * this method will simply lookup the function through the
- * FunctionRegistry and create an expression based on the builder.
- *
- * For a permanent function that has not been loaded, we will first fetch
its metadata
- * from the underlying external catalog. Then, we will load all resources
associated
- * with this function (i.e. jars and files). Finally, we create a function
builder
- * based on the function class and put the builder into the FunctionRegistry.
- * The name of this function in the FunctionRegistry will be
`databaseName.functionName`.
+ * Looks up a persistent scalar function by name and resolves it to an
Expression.
Review comment:
nit: `Looks` -> `Look`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
##########
@@ -743,6 +743,36 @@ object FunctionRegistry {
val functionSet: Set[FunctionIdentifier] = builtin.listFunction().toSet
+ private def makeExprInfoForVirtualOperator(name: String, usage: String):
ExpressionInfo = {
+ new ExpressionInfo(
+ null,
+ null,
+ name,
+ usage,
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "built-in")
+ }
+
+ val builtinOperators: Map[String, ExpressionInfo] = Map(
+ "<>" -> makeExprInfoForVirtualOperator("<>",
+ "Usage: expr1 <> expr2 - Returns true if `expr1` is not equal to
`expr2`."),
+ "!=" -> makeExprInfoForVirtualOperator("!=",
+ "Usage: expr1 != expr2 - Returns true if `expr1` is not equal to
`expr2`."),
+ "between" -> makeExprInfoForVirtualOperator("between",
+ "Usage: expr1 [NOT] BETWEEN expr2 AND expr3 - " +
+ "evaluate if `expr1` is [not] in between `expr2` and `expr3`."),
+ "case" -> makeExprInfoForVirtualOperator("case",
+ "Usage: CASE expr1 WHEN expr2 THEN expr3 [WHEN expr4 THEN expr5]* [ELSE
expr6] END " +
+ "- When `expr1` = `expr2`, returns `expr3`; when `expr1` = `expr4`,
return `expr5`; " +
+ "else return `expr6`."),
+ "||" -> FunctionRegistryBase.expressionInfo[Concat]("||", None)
Review comment:
The description for `Concat` doesn't seem to describe `||` well: `Usage:
||(col1, col2, ..., colN) - Returns the concatenation of col1, col2, ...,
colN.`. Shall we use the original description for `||`?
```
expr1 || expr2 - Returns the concatenation of `expr1` and `expr2`
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
##########
@@ -743,6 +743,36 @@ object FunctionRegistry {
val functionSet: Set[FunctionIdentifier] = builtin.listFunction().toSet
+ private def makeExprInfoForVirtualOperator(name: String, usage: String):
ExpressionInfo = {
+ new ExpressionInfo(
+ null,
+ null,
+ name,
+ usage,
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "built-in")
+ }
+
+ val builtinOperators: Map[String, ExpressionInfo] = Map(
+ "<>" -> makeExprInfoForVirtualOperator("<>",
+ "Usage: expr1 <> expr2 - Returns true if `expr1` is not equal to
`expr2`."),
Review comment:
We should remove the `Usage: ` here.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
##########
@@ -1540,130 +1547,181 @@ class SessionCatalog(
}
/**
- * Look up the [[ExpressionInfo]] associated with the specified function,
assuming it exists.
+ * Look up the `ExpressionInfo` of the given function by name if it's a
built-in or temp function.
+ * This supports both scalar and table functions.
*/
- def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo =
synchronized {
- // TODO: just make function registry take in FunctionIdentifier instead of
duplicating this
+ def lookupBuiltinOrTempFunction(name: String): Option[ExpressionInfo] = {
+
FunctionRegistry.builtinOperators.get(name.toLowerCase(Locale.ROOT)).orElse {
+ def isBuiltin(ident: FunctionIdentifier): Boolean = {
+ FunctionRegistry.builtin.functionExists(ident) ||
+ TableFunctionRegistry.builtin.functionExists(ident)
+ }
+ def lookup(ident: FunctionIdentifier): Option[ExpressionInfo] = {
+ functionRegistry.lookupFunction(ident).orElse(
+ tableFunctionRegistry.lookupFunction(ident))
+ }
+ synchronized(lookupTempFuncWithViewContext(name, isBuiltin, lookup))
+ }
+ }
+
+ /**
+ * Looks up a built-in or temp scalar function by name and resolves it to an
Expression if such
+ * a function exists.
+ */
+ def resolveBuiltinOrTempFunction(name: String, arguments: Seq[Expression]):
Option[Expression] = {
+ resolveBuiltinOrTempFunctionInternal(
+ name, arguments, FunctionRegistry.builtin.functionExists,
functionRegistry)
+ }
+
+ /**
+ * Looks up a built-in or temp table function by name and resolves it to a
LogicalPlan if such
Review comment:
nit: `Looks` -> `Look`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
##########
@@ -1540,130 +1547,181 @@ class SessionCatalog(
}
/**
- * Look up the [[ExpressionInfo]] associated with the specified function,
assuming it exists.
+ * Look up the `ExpressionInfo` of the given function by name if it's a
built-in or temp function.
+ * This supports both scalar and table functions.
*/
- def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo =
synchronized {
- // TODO: just make function registry take in FunctionIdentifier instead of
duplicating this
+ def lookupBuiltinOrTempFunction(name: String): Option[ExpressionInfo] = {
+
FunctionRegistry.builtinOperators.get(name.toLowerCase(Locale.ROOT)).orElse {
+ def isBuiltin(ident: FunctionIdentifier): Boolean = {
+ FunctionRegistry.builtin.functionExists(ident) ||
+ TableFunctionRegistry.builtin.functionExists(ident)
+ }
+ def lookup(ident: FunctionIdentifier): Option[ExpressionInfo] = {
+ functionRegistry.lookupFunction(ident).orElse(
+ tableFunctionRegistry.lookupFunction(ident))
+ }
+ synchronized(lookupTempFuncWithViewContext(name, isBuiltin, lookup))
+ }
+ }
+
+ /**
+ * Looks up a built-in or temp scalar function by name and resolves it to an
Expression if such
+ * a function exists.
+ */
+ def resolveBuiltinOrTempFunction(name: String, arguments: Seq[Expression]):
Option[Expression] = {
+ resolveBuiltinOrTempFunctionInternal(
+ name, arguments, FunctionRegistry.builtin.functionExists,
functionRegistry)
+ }
+
+ /**
+ * Looks up a built-in or temp table function by name and resolves it to a
LogicalPlan if such
+ * a function exists.
+ */
+ def resolveBuiltinOrTempTableFunction(
+ name: String, arguments: Seq[Expression]): Option[LogicalPlan] = {
+ resolveBuiltinOrTempFunctionInternal(
+ name, arguments, TableFunctionRegistry.builtin.functionExists,
tableFunctionRegistry)
+ }
+
+ private def resolveBuiltinOrTempFunctionInternal[T](
+ name: String,
+ arguments: Seq[Expression],
+ isBuiltin: FunctionIdentifier => Boolean,
+ registry: FunctionRegistryBase[T]): Option[T] = synchronized {
+ val funcIdent = FunctionIdentifier(name)
+ if (!registry.functionExists(funcIdent)) {
+ None
+ } else {
+ lookupTempFuncWithViewContext(
+ name, isBuiltin, ident => Option(registry.lookupFunction(ident,
arguments)))
+ }
+ }
+
+ private def lookupTempFuncWithViewContext[T](
+ name: String,
+ isBuiltin: FunctionIdentifier => Boolean,
+ lookupFunc: FunctionIdentifier => Option[T]): Option[T] = {
+ val funcIdent = FunctionIdentifier(name)
+ if (isBuiltin(funcIdent)) {
+ lookupFunc(funcIdent)
+ } else {
+ val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
+ val referredTempFunctionNames =
AnalysisContext.get.referredTempFunctionNames
+ if (isResolvingView) {
Review comment:
This is so much more clear!
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
##########
@@ -1540,130 +1547,181 @@ class SessionCatalog(
}
/**
- * Look up the [[ExpressionInfo]] associated with the specified function,
assuming it exists.
+ * Look up the `ExpressionInfo` of the given function by name if it's a
built-in or temp function.
+ * This supports both scalar and table functions.
*/
- def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo =
synchronized {
- // TODO: just make function registry take in FunctionIdentifier instead of
duplicating this
+ def lookupBuiltinOrTempFunction(name: String): Option[ExpressionInfo] = {
+
FunctionRegistry.builtinOperators.get(name.toLowerCase(Locale.ROOT)).orElse {
+ def isBuiltin(ident: FunctionIdentifier): Boolean = {
+ FunctionRegistry.builtin.functionExists(ident) ||
+ TableFunctionRegistry.builtin.functionExists(ident)
+ }
+ def lookup(ident: FunctionIdentifier): Option[ExpressionInfo] = {
+ functionRegistry.lookupFunction(ident).orElse(
+ tableFunctionRegistry.lookupFunction(ident))
+ }
+ synchronized(lookupTempFuncWithViewContext(name, isBuiltin, lookup))
+ }
+ }
+
+ /**
+ * Looks up a built-in or temp scalar function by name and resolves it to an
Expression if such
+ * a function exists.
+ */
+ def resolveBuiltinOrTempFunction(name: String, arguments: Seq[Expression]):
Option[Expression] = {
+ resolveBuiltinOrTempFunctionInternal(
+ name, arguments, FunctionRegistry.builtin.functionExists,
functionRegistry)
+ }
+
+ /**
+ * Looks up a built-in or temp table function by name and resolves it to a
LogicalPlan if such
+ * a function exists.
+ */
+ def resolveBuiltinOrTempTableFunction(
+ name: String, arguments: Seq[Expression]): Option[LogicalPlan] = {
+ resolveBuiltinOrTempFunctionInternal(
+ name, arguments, TableFunctionRegistry.builtin.functionExists,
tableFunctionRegistry)
+ }
+
+ private def resolveBuiltinOrTempFunctionInternal[T](
+ name: String,
+ arguments: Seq[Expression],
+ isBuiltin: FunctionIdentifier => Boolean,
+ registry: FunctionRegistryBase[T]): Option[T] = synchronized {
+ val funcIdent = FunctionIdentifier(name)
+ if (!registry.functionExists(funcIdent)) {
+ None
+ } else {
+ lookupTempFuncWithViewContext(
+ name, isBuiltin, ident => Option(registry.lookupFunction(ident,
arguments)))
+ }
+ }
+
+ private def lookupTempFuncWithViewContext[T](
+ name: String,
+ isBuiltin: FunctionIdentifier => Boolean,
+ lookupFunc: FunctionIdentifier => Option[T]): Option[T] = {
+ val funcIdent = FunctionIdentifier(name)
+ if (isBuiltin(funcIdent)) {
+ lookupFunc(funcIdent)
+ } else {
+ val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
+ val referredTempFunctionNames =
AnalysisContext.get.referredTempFunctionNames
+ if (isResolvingView) {
+ // When resolving a view, only return a temp function if it's referred
by this view.
+ if (referredTempFunctionNames.contains(name)) {
+ lookupFunc(funcIdent)
+ } else {
+ None
+ }
+ } else {
+ val result = lookupFunc(funcIdent)
+ if (result.isDefined) {
+ // We are not resolving a view and the function is a temp one, add
it to
+ // `AnalysisContext`, so during the view creation, we can save all
referred temp
+ // functions to view metadata.
+ AnalysisContext.get.referredTempFunctionNames.add(name)
+ }
+ result
+ }
+ }
+ }
+
+ /**
+ * Look up the `ExpressionInfo` of the given function by name if it's a
persistent function.
+ * This supports both scalar and table functions.
+ */
+ def lookupPersistentFunction(name: FunctionIdentifier): ExpressionInfo = {
val database =
name.database.orElse(Some(currentDb)).map(formatDatabaseName)
val qualifiedName = name.copy(database = database)
- functionRegistry.lookupFunction(name)
- .orElse(functionRegistry.lookupFunction(qualifiedName))
- .orElse(tableFunctionRegistry.lookupFunction(name))
+ functionRegistry.lookupFunction(qualifiedName)
+ .orElse(tableFunctionRegistry.lookupFunction(qualifiedName))
.getOrElse {
val db = qualifiedName.database.get
requireDbExists(db)
if (externalCatalog.functionExists(db, name.funcName)) {
val metadata = externalCatalog.getFunction(db, name.funcName)
- new ExpressionInfo(
- metadata.className,
- qualifiedName.database.orNull,
- qualifiedName.identifier,
- null,
- "",
- "",
- "",
- "",
- "",
- "",
- "hive")
+ makeExprInfoForHiveFunction(metadata.copy(identifier =
qualifiedName))
} else {
failFunctionLookup(name)
}
}
}
/**
- * Look up a specific function, assuming it exists.
- *
- * For a temporary function or a permanent function that has been loaded,
- * this method will simply lookup the function through the
- * FunctionRegistry and create an expression based on the builder.
- *
- * For a permanent function that has not been loaded, we will first fetch
its metadata
- * from the underlying external catalog. Then, we will load all resources
associated
- * with this function (i.e. jars and files). Finally, we create a function
builder
- * based on the function class and put the builder into the FunctionRegistry.
- * The name of this function in the FunctionRegistry will be
`databaseName.functionName`.
+ * Looks up a persistent scalar function by name and resolves it to an
Expression.
*/
- private def lookupFunction[T](
- name: FunctionIdentifier,
- children: Seq[Expression],
- registry: FunctionRegistryBase[T]): T = synchronized {
- import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
-
- // Note: the implementation of this function is a little bit convoluted.
- // We probably shouldn't use a single FunctionRegistry to register all
three kinds of functions
- // (built-in, temp, and external).
- if (name.database.isEmpty && registry.functionExists(name)) {
- val referredTempFunctionNames =
AnalysisContext.get.referredTempFunctionNames
- val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
- // Lookup the function as a temporary or a built-in function (i.e.
without database) and
- // 1. if we are not resolving view, we don't care about the function
type and just return it.
- // 2. if we are resolving view, only return a temp function if it's
referred by this view.
- if (!isResolvingView ||
- !isTemporaryFunction(name) ||
- referredTempFunctionNames.contains(name.funcName)) {
- // We are not resolving a view and the function is a temp one, add it
to `AnalysisContext`,
- // so during the view creation, we can save all referred temp
functions to view metadata
- if (!isResolvingView && isTemporaryFunction(name)) {
- AnalysisContext.get.referredTempFunctionNames.add(name.funcName)
- }
- // This function has been already loaded into the function registry.
- return registry.lookupFunction(name, children)
- }
- }
+ def resolvePersistentFunction(
+ name: FunctionIdentifier, arguments: Seq[Expression]): Expression = {
+ resolvePersistentFunctionInternal(name, arguments, functionRegistry,
makeFunctionBuilder)
+ }
- // Get the database from AnalysisContext if it's defined, otherwise, use
current database
- val currentDatabase = AnalysisContext.get.catalogAndNamespace match {
- case Seq() => getCurrentDatabase
- case Seq(_, db) => db
- case Seq(catalog, namespace @ _*) =>
- throw new IllegalStateException(s"[BUG] unexpected v2 catalog:
$catalog, and " +
- s"namespace: ${namespace.quoted} in v1 function lookup")
- }
+ /**
+ * Looks up a persistent table function by name and resolves it to a
LogicalPlan.
+ */
+ def resolvePersistentTableFunction(
+ name: FunctionIdentifier,
+ arguments: Seq[Expression]): LogicalPlan = {
+ // We don't support persistent table functions yet.
+ val builder = (func: CatalogFunction) => failFunctionLookup(name)
+ resolvePersistentFunctionInternal(name, arguments, tableFunctionRegistry,
builder)
+ }
- // If the name itself is not qualified, add the current database to it.
- val database = formatDatabaseName(name.database.getOrElse(currentDatabase))
+ private def resolvePersistentFunctionInternal[T](
+ name: FunctionIdentifier,
+ arguments: Seq[Expression],
+ registry: FunctionRegistryBase[T],
+ createFunctionBuilder: CatalogFunction =>
FunctionRegistryBase[T]#FunctionBuilder): T = {
+ val database = formatDatabaseName(name.database.getOrElse(currentDb))
val qualifiedName = name.copy(database = Some(database))
-
if (registry.functionExists(qualifiedName)) {
// This function has been already loaded into the function registry.
- // Unlike the above block, we find this function by using the qualified
name.
- return registry.lookupFunction(qualifiedName, children)
- }
-
- // The function has not been loaded to the function registry, which means
- // that the function is a permanent function (if it actually has been
registered
- // in the metastore). We need to first put the function in the
FunctionRegistry.
- // TODO: why not just check whether the function exists first?
- val catalogFunction = try {
- externalCatalog.getFunction(database, name.funcName)
- } catch {
- case _: AnalysisException => failFunctionLookup(name)
+ registry.lookupFunction(qualifiedName, arguments)
+ } else {
+ // The function has not been loaded to the function registry, which means
+ // that the function is a persistent function (if it actually has been
registered
+ // in the metastore). We need to first put the function in the function
registry.
+ val catalogFunction = try {
+ externalCatalog.getFunction(database, qualifiedName.funcName)
+ } catch {
+ case _: AnalysisException => failFunctionLookup(qualifiedName)
+ }
+ loadFunctionResources(catalogFunction.resources)
+ // Please note that qualifiedName is provided by the user. However,
+ // catalogFunction.identifier.unquotedString is returned by the
underlying
+ // catalog. So, it is possible that qualifiedName is not exactly the
same as
+ // catalogFunction.identifier.unquotedString (difference is on
case-sensitivity).
+ // At here, we preserve the input from the user.
+ val funcMetadata = catalogFunction.copy(identifier = qualifiedName)
+ registerFunction(
+ funcMetadata,
+ overrideIfExists = false,
+ registry = registry,
+ functionBuilder = createFunctionBuilder(funcMetadata))
+ // Now, we need to create the Expression.
+ registry.lookupFunction(qualifiedName, arguments)
}
- loadFunctionResources(catalogFunction.resources)
- // Please note that qualifiedName is provided by the user. However,
- // catalogFunction.identifier.unquotedString is returned by the underlying
- // catalog. So, it is possible that qualifiedName is not exactly the same
as
- // catalogFunction.identifier.unquotedString (difference is on
case-sensitivity).
- // At here, we preserve the input from the user.
- registerFunction(catalogFunction.copy(identifier = qualifiedName),
overrideIfExists = false)
- // Now, we need to create the Expression.
- registry.lookupFunction(qualifiedName, children)
}
/**
- * Return an [[Expression]] that represents the specified function, assuming
it exists.
+ * Look up the [[ExpressionInfo]] associated with the specified function,
assuming it exists.
*/
- def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]):
Expression = {
- lookupFunction[Expression](name, children, functionRegistry)
+ def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo =
synchronized {
+ if (name.database.isEmpty) {
+
lookupBuiltinOrTempFunction(name.funcName).getOrElse(lookupPersistentFunction(name))
+ } else {
+ lookupPersistentFunction(name)
+ }
}
- /**
- * Return a [[LogicalPlan]] that represents the specified function, assuming
it exists.
- */
- def lookupTableFunction(name: FunctionIdentifier, children:
Seq[Expression]): LogicalPlan = {
- lookupFunction[LogicalPlan](name, children, tableFunctionRegistry)
+ // Test only
Review comment:
I am curious why these two public methods `lookupFunction` (Test only)
and `lookupTableFunction` are removed here?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
##########
@@ -30,8 +30,14 @@ case class ResolveTableValuedFunctions(catalog:
SessionCatalog) extends Rule[Log
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved)
=>
withPosition(u) {
+ val maybeBuiltinOrTempFunc = if (u.name.database.isEmpty) {
+ catalog.resolveBuiltinOrTempTableFunction(u.name.funcName,
u.functionArgs)
+ } else {
+ None
+ }
val resolvedFunc = try {
- catalog.lookupTableFunction(u.name, u.functionArgs)
+ maybeBuiltinOrTempFunc.getOrElse(
Review comment:
Ditto. How about encapsulating this logic into`lookupTableFunction`?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
##########
@@ -1540,130 +1547,181 @@ class SessionCatalog(
}
/**
- * Look up the [[ExpressionInfo]] associated with the specified function,
assuming it exists.
+ * Look up the `ExpressionInfo` of the given function by name if it's a
built-in or temp function.
+ * This supports both scalar and table functions.
*/
- def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo =
synchronized {
- // TODO: just make function registry take in FunctionIdentifier instead of
duplicating this
+ def lookupBuiltinOrTempFunction(name: String): Option[ExpressionInfo] = {
+
FunctionRegistry.builtinOperators.get(name.toLowerCase(Locale.ROOT)).orElse {
+ def isBuiltin(ident: FunctionIdentifier): Boolean = {
+ FunctionRegistry.builtin.functionExists(ident) ||
+ TableFunctionRegistry.builtin.functionExists(ident)
+ }
+ def lookup(ident: FunctionIdentifier): Option[ExpressionInfo] = {
+ functionRegistry.lookupFunction(ident).orElse(
+ tableFunctionRegistry.lookupFunction(ident))
+ }
+ synchronized(lookupTempFuncWithViewContext(name, isBuiltin, lookup))
+ }
+ }
+
+ /**
+ * Looks up a built-in or temp scalar function by name and resolves it to an
Expression if such
+ * a function exists.
+ */
+ def resolveBuiltinOrTempFunction(name: String, arguments: Seq[Expression]):
Option[Expression] = {
+ resolveBuiltinOrTempFunctionInternal(
+ name, arguments, FunctionRegistry.builtin.functionExists,
functionRegistry)
+ }
+
+ /**
+ * Looks up a built-in or temp table function by name and resolves it to a
LogicalPlan if such
+ * a function exists.
+ */
+ def resolveBuiltinOrTempTableFunction(
+ name: String, arguments: Seq[Expression]): Option[LogicalPlan] = {
+ resolveBuiltinOrTempFunctionInternal(
+ name, arguments, TableFunctionRegistry.builtin.functionExists,
tableFunctionRegistry)
+ }
+
+ private def resolveBuiltinOrTempFunctionInternal[T](
+ name: String,
+ arguments: Seq[Expression],
+ isBuiltin: FunctionIdentifier => Boolean,
+ registry: FunctionRegistryBase[T]): Option[T] = synchronized {
+ val funcIdent = FunctionIdentifier(name)
+ if (!registry.functionExists(funcIdent)) {
+ None
+ } else {
+ lookupTempFuncWithViewContext(
+ name, isBuiltin, ident => Option(registry.lookupFunction(ident,
arguments)))
+ }
+ }
+
+ private def lookupTempFuncWithViewContext[T](
+ name: String,
+ isBuiltin: FunctionIdentifier => Boolean,
+ lookupFunc: FunctionIdentifier => Option[T]): Option[T] = {
+ val funcIdent = FunctionIdentifier(name)
+ if (isBuiltin(funcIdent)) {
+ lookupFunc(funcIdent)
+ } else {
+ val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
+ val referredTempFunctionNames =
AnalysisContext.get.referredTempFunctionNames
+ if (isResolvingView) {
+ // When resolving a view, only return a temp function if it's referred
by this view.
+ if (referredTempFunctionNames.contains(name)) {
+ lookupFunc(funcIdent)
+ } else {
+ None
+ }
+ } else {
+ val result = lookupFunc(funcIdent)
+ if (result.isDefined) {
+ // We are not resolving a view and the function is a temp one, add
it to
+ // `AnalysisContext`, so during the view creation, we can save all
referred temp
+ // functions to view metadata.
+ AnalysisContext.get.referredTempFunctionNames.add(name)
+ }
+ result
+ }
+ }
+ }
+
+ /**
+ * Look up the `ExpressionInfo` of the given function by name if it's a
persistent function.
+ * This supports both scalar and table functions.
+ */
+ def lookupPersistentFunction(name: FunctionIdentifier): ExpressionInfo = {
val database =
name.database.orElse(Some(currentDb)).map(formatDatabaseName)
val qualifiedName = name.copy(database = database)
- functionRegistry.lookupFunction(name)
- .orElse(functionRegistry.lookupFunction(qualifiedName))
- .orElse(tableFunctionRegistry.lookupFunction(name))
+ functionRegistry.lookupFunction(qualifiedName)
+ .orElse(tableFunctionRegistry.lookupFunction(qualifiedName))
.getOrElse {
val db = qualifiedName.database.get
requireDbExists(db)
if (externalCatalog.functionExists(db, name.funcName)) {
val metadata = externalCatalog.getFunction(db, name.funcName)
- new ExpressionInfo(
- metadata.className,
- qualifiedName.database.orNull,
- qualifiedName.identifier,
- null,
- "",
- "",
- "",
- "",
- "",
- "",
- "hive")
+ makeExprInfoForHiveFunction(metadata.copy(identifier =
qualifiedName))
} else {
failFunctionLookup(name)
}
}
}
/**
- * Look up a specific function, assuming it exists.
- *
- * For a temporary function or a permanent function that has been loaded,
- * this method will simply lookup the function through the
- * FunctionRegistry and create an expression based on the builder.
- *
- * For a permanent function that has not been loaded, we will first fetch
its metadata
- * from the underlying external catalog. Then, we will load all resources
associated
- * with this function (i.e. jars and files). Finally, we create a function
builder
- * based on the function class and put the builder into the FunctionRegistry.
- * The name of this function in the FunctionRegistry will be
`databaseName.functionName`.
+ * Looks up a persistent scalar function by name and resolves it to an
Expression.
*/
- private def lookupFunction[T](
- name: FunctionIdentifier,
- children: Seq[Expression],
- registry: FunctionRegistryBase[T]): T = synchronized {
- import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
-
- // Note: the implementation of this function is a little bit convoluted.
- // We probably shouldn't use a single FunctionRegistry to register all
three kinds of functions
- // (built-in, temp, and external).
- if (name.database.isEmpty && registry.functionExists(name)) {
- val referredTempFunctionNames =
AnalysisContext.get.referredTempFunctionNames
- val isResolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty
- // Lookup the function as a temporary or a built-in function (i.e.
without database) and
- // 1. if we are not resolving view, we don't care about the function
type and just return it.
- // 2. if we are resolving view, only return a temp function if it's
referred by this view.
- if (!isResolvingView ||
- !isTemporaryFunction(name) ||
- referredTempFunctionNames.contains(name.funcName)) {
- // We are not resolving a view and the function is a temp one, add it
to `AnalysisContext`,
- // so during the view creation, we can save all referred temp
functions to view metadata
- if (!isResolvingView && isTemporaryFunction(name)) {
- AnalysisContext.get.referredTempFunctionNames.add(name.funcName)
- }
- // This function has been already loaded into the function registry.
- return registry.lookupFunction(name, children)
- }
- }
+ def resolvePersistentFunction(
+ name: FunctionIdentifier, arguments: Seq[Expression]): Expression = {
+ resolvePersistentFunctionInternal(name, arguments, functionRegistry,
makeFunctionBuilder)
+ }
- // Get the database from AnalysisContext if it's defined, otherwise, use
current database
- val currentDatabase = AnalysisContext.get.catalogAndNamespace match {
- case Seq() => getCurrentDatabase
- case Seq(_, db) => db
- case Seq(catalog, namespace @ _*) =>
- throw new IllegalStateException(s"[BUG] unexpected v2 catalog:
$catalog, and " +
- s"namespace: ${namespace.quoted} in v1 function lookup")
- }
+ /**
+ * Looks up a persistent table function by name and resolves it to a
LogicalPlan.
Review comment:
Ditto
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
##########
@@ -258,6 +258,14 @@ object QueryCompilationErrors {
t.origin.line, t.origin.startPosition)
}
+ def expectPersistentFunc(
Review comment:
expectPersistentFuncError
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]