Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/12036#discussion_r57963712
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
---
@@ -55,61 +57,133 @@ private[hive] class HiveFunctionRegistry(
}
}
- override def lookupFunction(name: String, children: Seq[Expression]):
Expression = {
- Try(underlying.lookupFunction(name, children)).getOrElse {
- // We only look it up to see if it exists, but do not include it in
the HiveUDF since it is
- // not always serializable.
- val functionInfo: FunctionInfo =
- Option(getFunctionInfo(name.toLowerCase)).getOrElse(
- throw new AnalysisException(s"undefined function $name"))
+ def loadHivePermanentFunction(name: String): Option[CatalogFunction] = {
+ val databaseName = sessionStage.catalog.getCurrentDatabase
+ val func = FunctionIdentifier(name, Option(databaseName))
+ val catalogFunc =
+ if (sessionStage.catalog.listFunctions(databaseName, name).size !=
0) {
+ Some(sessionStage.catalog.getFunction(func))
+ } else {
+ None
+ }
+ catalogFunc.map(_.resources.foreach { resource =>
+ resource._1.toLowerCase match {
+ case "jar" => sessionStage.ctx.addJar(resource._2)
+ case _ =>
+ sessionStage.ctx.runSqlHive(s"ADD FILE ${resource._2}")
+ sessionStage.ctx.sparkContext.addFile(resource._2)
+ }
+ })
+ catalogFunc
+ }
- val functionClassName = functionInfo.getFunctionClass.getName
+ override def makeFunctionBuilderAndInfo(
+ name: String,
+ functionClassName: String): (ExpressionInfo, FunctionBuilder) = {
+ val hiveUDFWrapper = new HiveFunctionWrapper(functionClassName)
+ val hiveUDFClass = hiveUDFWrapper.createFunction().getClass
+ val info = new ExpressionInfo(functionClassName, name)
+ val builder = makeHiveUDFBuilder(name, functionClassName,
hiveUDFClass, hiveUDFWrapper)
+ (info, builder)
+ }
- // When we instantiate hive UDF wrapper class, we may throw
exception if the input expressions
- // don't satisfy the hive UDF, such as type mismatch, input number
mismatch, etc. Here we
- // catch the exception and throw AnalysisException instead.
+ /**
+ * Generates a Spark FunctionBuilder for a Hive UDF which is specified
by a given classname.
+ */
+ def makeHiveUDFBuilder(
+ name: String,
+ functionClassName: String,
+ hiveUDFClass: Class[_],
+ hiveUDFWrapper: HiveFunctionWrapper): FunctionBuilder = {
+ val builder = (children: Seq[Expression]) => {
try {
- if
(classOf[GenericUDFMacro].isAssignableFrom(functionInfo.getFunctionClass)) {
+ if (classOf[GenericUDFMacro].isAssignableFrom(hiveUDFClass)) {
val udf = HiveGenericUDF(
- name, new HiveFunctionWrapper(functionClassName,
functionInfo.getGenericUDF), children)
- udf.dataType // Force it to check input data types.
+ name, hiveUDFWrapper, children)
+ if (udf.resolved) {
+ udf.dataType // Force it to check input data types.
+ }
udf
- } else if
(classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveSimpleUDF(name, new
HiveFunctionWrapper(functionClassName), children)
- udf.dataType // Force it to check input data types.
+ } else if (classOf[UDF].isAssignableFrom(hiveUDFClass)) {
+ val udf = HiveSimpleUDF(name, hiveUDFWrapper, children)
+ if (udf.resolved) {
+ udf.dataType // Force it to check input data types.
+ }
udf
- } else if
(classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveGenericUDF(name, new
HiveFunctionWrapper(functionClassName), children)
- udf.dataType // Force it to check input data types.
+ } else if (classOf[GenericUDF].isAssignableFrom(hiveUDFClass)) {
+ val udf = HiveGenericUDF(name, hiveUDFWrapper, children)
+ if (udf.resolved) {
+ udf.dataType // Force it to check input data types.
+ }
udf
} else if (
-
classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass))
{
- val udaf = HiveUDAFFunction(name, new
HiveFunctionWrapper(functionClassName), children)
- udaf.dataType // Force it to check input data types.
+
classOf[AbstractGenericUDAFResolver].isAssignableFrom(hiveUDFClass)) {
+ val udaf = HiveUDAFFunction(name, hiveUDFWrapper, children)
+ if (udaf.resolved) {
+ udaf.dataType // Force it to check input data types.
+ }
udaf
- } else if
(classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
+ } else if (classOf[UDAF].isAssignableFrom(hiveUDFClass)) {
val udaf = HiveUDAFFunction(
- name, new HiveFunctionWrapper(functionClassName), children,
isUDAFBridgeRequired = true)
- udaf.dataType // Force it to check input data types.
+ name, hiveUDFWrapper, children, isUDAFBridgeRequired = true)
+ if (udaf.resolved) {
+ udaf.dataType // Force it to check input data types.
+ }
udaf
- } else if
(classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udtf = HiveGenericUDTF(name, new
HiveFunctionWrapper(functionClassName), children)
- udtf.elementTypes // Force it to check input data types.
+ } else if (classOf[GenericUDTF].isAssignableFrom(hiveUDFClass)) {
+ val udtf = HiveGenericUDTF(name, hiveUDFWrapper, children)
+ if (udtf.resolved) {
+ udtf.elementTypes // Force it to check input data types.
+ }
udtf
} else {
- throw new AnalysisException(s"No handler for udf
${functionInfo.getFunctionClass}")
+ throw new AnalysisException(s"No handler for udf
${hiveUDFClass}")
}
} catch {
case analysisException: AnalysisException =>
- // If the exception is an AnalysisException, just throw it.
throw analysisException
case throwable: Throwable =>
- // If there is any other error, we throw an AnalysisException.
- val errorMessage = s"No handler for Hive udf
${functionInfo.getFunctionClass} " +
+ val errorMessage = s"No handler for Hive udf ${hiveUDFClass} " +
s"because: ${throwable.getMessage}."
throw new AnalysisException(errorMessage)
}
}
+ builder
+ }
+
+ override def lookupFunction(name: String, children: Seq[Expression]):
Expression = {
+ val builder = underlying.lookupFunctionBuilder(name)
+ if (builder.isDefined) {
+ builder.get(children)
+ } else {
+ // We only look it up to see if it exists, but do not include it in
the HiveUDF since it is
+ // not always serializable.
+ val optFunctionInfo = Option(getFunctionInfo(name.toLowerCase))
+ if (optFunctionInfo.isEmpty) {
+ val catalogFunc = loadHivePermanentFunction(name).getOrElse(
--- End diff --
@viirya can you explain what we're doing here? If `getFunctionInfo` is
null, why will session catalog know about the function? In the old code we just
threw an exception if `getFunctionInfo` is null. Why do we try to do something
else here?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]