Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/12117#discussion_r58329324
--- Diff:
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala ---
@@ -112,4 +124,121 @@ class HiveSessionCatalog(
metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
}
+ override def makeFunctionBuilder(funcName: String, className: String):
FunctionBuilder = {
+ makeFunctionBuilder(funcName, Utils.classForName(className))
+ }
+
+ /**
+ * Construct a [[FunctionBuilder]] based on the provided class that
represents a function.
+ */
+ private def makeFunctionBuilder(name: String, clazz: Class[_]):
FunctionBuilder = {
+ // When we instantiate hive UDF wrapper class, we may throw exception
if the input
+ // expressions don't satisfy the hive UDF, such as type mismatch,
input number
+ // mismatch, etc. Here we catch the exception and throw
AnalysisException instead.
+ (children: Seq[Expression]) => {
+ try {
+ if (classOf[UDF].isAssignableFrom(clazz)) {
+ val udf = HiveSimpleUDF(name, new
HiveFunctionWrapper(clazz.getName), children)
+ udf.dataType // Force it to check input data types.
+ udf
+ } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
+ val udf = HiveGenericUDF(name, new
HiveFunctionWrapper(clazz.getName), children)
+ udf.dataType // Force it to check input data types.
+ udf
+ } else if
(classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
+ val udaf = HiveUDAFFunction(name, new
HiveFunctionWrapper(clazz.getName), children)
+ udaf.dataType // Force it to check input data types.
+ udaf
+ } else if (classOf[UDAF].isAssignableFrom(clazz)) {
+ val udaf = HiveUDAFFunction(
+ name,
+ new HiveFunctionWrapper(clazz.getName),
+ children,
+ isUDAFBridgeRequired = true)
+ udaf.dataType // Force it to check input data types.
+ udaf
+ } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
+ val udtf = HiveGenericUDTF(name, new
HiveFunctionWrapper(clazz.getName), children)
+ udtf.elementTypes // Force it to check input data types.
+ udtf
+ } else {
+ throw new AnalysisException(s"No handler for Hive UDF
'${clazz.getCanonicalName}'")
+ }
+ } catch {
+ case ae: AnalysisException =>
+ throw ae
+ case NonFatal(e) =>
+ val analysisException =
+ new AnalysisException(s"No handler for Hive UDF
'${clazz.getCanonicalName}': $e")
+ analysisException.setStackTrace(e.getStackTrace)
+ throw analysisException
+ }
+ }
+ }
+
+ // We have a list of Hive built-in functions that we do not support. So,
we will check
+ // Hive's function registry and lazily load needed functions into our
own function registry.
+ // Those Hive built-in functions are
+ // assert_true, collect_list, collect_set, compute_stats,
context_ngrams, create_union,
+ // current_user ,elt, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty,
ewah_bitmap_or, field,
+ // histogram_numeric, in_file, index, inline, java_method, map_keys,
map_values,
+ // matchpath, ngrams, noop, noopstreaming, noopwithmap,
noopwithmapstreaming,
+ // parse_url, parse_url_tuple, percentile, percentile_approx,
posexplode, reflect, reflect2,
+ // regexp, sentences, stack, std, str_to_map, windowingtablefunction,
xpath, xpath_boolean,
+ // xpath_double, xpath_float, xpath_int, xpath_long, xpath_number,
+ // xpath_short, and xpath_string.
+ override def lookupFunction(name: String, children: Seq[Expression]):
Expression = {
+ Try(super.lookupFunction(name, children)) match {
+ case Success(expr) => expr
+ case Failure(error) =>
+ if (functionRegistry.functionExists(name)) {
+ // If the function actually exists in functionRegistry, it means
that there is an
+ // error when we create the Expression using the given children.
+ // We need to throw the original exception.
+ throw error
+ } else {
+ // This function is not in functionRegistry, let's try to load
it as a Hive's
+ // built-in function.
+ val functionName = name.toLowerCase
+ // TODO: This may not really work for current_user because
current_user is not evaluated
+ // with session info.
+ // We do not need to use executionHive at here because we only
load
+ // Hive's builtin functions, which do not need current db.
+ val functionInfo = {
+ try {
+
Option(HiveFunctionRegistry.getFunctionInfo(functionName)).getOrElse(
+ throw new AnalysisException(s"Undefined Hive UDF: $name"))
+ } catch {
+ // If HiveFunctionRegistry.getFunctionInfo throws an
exception,
+ // we are failing to load a Hive builtin function, which
means that
+ // the given function is not a Hive builtin function.
+ case NonFatal(e) => throw new AnalysisException(s"Undefined
Hive UDF: $name")
+ }
+ }
+ val className = functionInfo.getFunctionClass.getName
+ val builder = makeFunctionBuilder(functionName, className)
+ // Put this Hive built-in function to our function registry.
+ val info = new ExpressionInfo(className, functionName)
+ createTempFunction(functionName, info, builder, ignoreIfExists =
false)
+ // Now, we need to create the Expression.
+ functionRegistry.lookupFunction(functionName, children)
+ }
+ }
+ }
+
+ // Pre-load a few commonly used Hive built-in functions.
+ HiveSessionCatalog.preloadedHiveBuiltinFunctions.foreach {
--- End diff --
I'd like to keep it close to `lookupFunction` and the object defined below
since they are related to functions.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]