HeartSaVioR commented on a change in pull request #27025: [SPARK-26560][SQL]
Spark should be able to run Hive UDF using jar regardless of current thread
context classloader
URL: https://github.com/apache/spark/pull/27025#discussion_r399617719
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
##########
@@ -66,49 +66,52 @@ private[sql] class HiveSessionCatalog(
name: String,
clazz: Class[_],
input: Seq[Expression]): Expression = {
-
- Try(super.makeFunctionExpression(name, clazz, input)).getOrElse {
- var udfExpr: Option[Expression] = None
- try {
- // When we instantiate hive UDF wrapper class, we may throw exception
if the input
- // expressions don't satisfy the hive UDF, such as type mismatch,
input number
- // mismatch, etc. Here we catch the exception and throw
AnalysisException instead.
- if (classOf[UDF].isAssignableFrom(clazz)) {
- udfExpr = Some(HiveSimpleUDF(name, new
HiveFunctionWrapper(clazz.getName), input))
- udfExpr.get.dataType // Force it to check input data types.
- } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
- udfExpr = Some(HiveGenericUDF(name, new
HiveFunctionWrapper(clazz.getName), input))
- udfExpr.get.dataType // Force it to check input data types.
- } else if
(classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
- udfExpr = Some(HiveUDAFFunction(name, new
HiveFunctionWrapper(clazz.getName), input))
- udfExpr.get.dataType // Force it to check input data types.
- } else if (classOf[UDAF].isAssignableFrom(clazz)) {
- udfExpr = Some(HiveUDAFFunction(
- name,
- new HiveFunctionWrapper(clazz.getName),
- input,
- isUDAFBridgeRequired = true))
- udfExpr.get.dataType // Force it to check input data types.
- } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
- udfExpr = Some(HiveGenericUDTF(name, new
HiveFunctionWrapper(clazz.getName), input))
- udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it
to check data types.
+ // Current thread context classloader may not be the one loaded the class.
Need to switch
+ // context classloader to initialize instance properly.
+ Utils.withContextClassLoader(clazz.getClassLoader) {
+ Try(super.makeFunctionExpression(name, clazz, input)).getOrElse {
+ var udfExpr: Option[Expression] = None
+ try {
+ // When we instantiate hive UDF wrapper class, we may throw
exception if the input
+ // expressions don't satisfy the hive UDF, such as type mismatch,
input number
+ // mismatch, etc. Here we catch the exception and throw
AnalysisException instead.
+ if (classOf[UDF].isAssignableFrom(clazz)) {
+ udfExpr = Some(HiveSimpleUDF(name, new
HiveFunctionWrapper(clazz.getName), input))
+ udfExpr.get.dataType // Force it to check input data types.
Review comment:
The experimental UT code I used is below (added to SQLQuerySuite.scala) :
```
test("SPARK-26560 ...experimenting Wenchen's comment...") {
// force to use Spark classloader as other test (even in other test
suites) may change the
// current thread's context classloader to jar classloader
Utils.withContextClassLoader(Utils.getSparkClassLoader) {
withUserDefinedFunction("udtf_count3" -> false) {
val sparkClassLoader = Thread.currentThread().getContextClassLoader
// This jar file should not be placed to the classpath;
GenericUDTFCount3 is slightly
// modified version of GenericUDTFCount2 in hive/contrib, which
emits the count for
// three times.
val jarPath = "src/test/noclasspath/TestUDTF-spark-26560.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
val className =
"org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3"
sql(
s"""
|CREATE FUNCTION udtf_count3
|AS '$className'
|USING JAR '$jarURL'
""".stripMargin)
assert(Thread.currentThread().getContextClassLoader eq
sparkClassLoader)
// JAR will be loaded at first usage, and it will change the current
thread's
// context classloader to jar classloader in sharedState.
// See SessionState.addJar for details.
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t")
assert(Thread.currentThread().getContextClassLoader ne
sparkClassLoader)
assert(Thread.currentThread().getContextClassLoader eq
spark.sqlContext.sharedState.jarClassLoader)
// uses classloader which loads clazz
val name = "default.udtf_count3"
val input = Array(AttributeReference("a", IntegerType, nullable =
false)())
val udf = HiveGenericUDTF(name, new HiveFunctionWrapper(className),
input)
// FIXME: uncommenting below line will lead test passing
// udf.dataType
// Roll back to the original classloader and run query again.
Without this line, the test
// would pass, as thread's context classloader is changed to jar
classloader. But thread
// context classloader can be changed from others as well which
would fail the query; one
// example is spark-shell, which thread context classloader rolls
back automatically. This
// mimics the behavior of spark-shell.
Thread.currentThread().setContextClassLoader(sparkClassLoader)
// FIXME: doing this "within" the context classloader which loads
the UDF class will
// lead test passing even we comment out udf.dataType
val newUdf =
udf.makeCopy(udf.productIterator.map(_.asInstanceOf[AnyRef]).toArray)
newUdf.dataType
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]