HeartSaVioR commented on a change in pull request #27025: [SPARK-26560][SQL] 
Spark should be able to run Hive UDF using jar regardless of current thread 
context classloader
URL: https://github.com/apache/spark/pull/27025#discussion_r399617719
 
 

 ##########
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
 ##########
 @@ -66,49 +66,52 @@ private[sql] class HiveSessionCatalog(
       name: String,
       clazz: Class[_],
       input: Seq[Expression]): Expression = {
-
-    Try(super.makeFunctionExpression(name, clazz, input)).getOrElse {
-      var udfExpr: Option[Expression] = None
-      try {
-        // When we instantiate hive UDF wrapper class, we may throw exception 
if the input
-        // expressions don't satisfy the hive UDF, such as type mismatch, 
input number
-        // mismatch, etc. Here we catch the exception and throw 
AnalysisException instead.
-        if (classOf[UDF].isAssignableFrom(clazz)) {
-          udfExpr = Some(HiveSimpleUDF(name, new 
HiveFunctionWrapper(clazz.getName), input))
-          udfExpr.get.dataType // Force it to check input data types.
-        } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
-          udfExpr = Some(HiveGenericUDF(name, new 
HiveFunctionWrapper(clazz.getName), input))
-          udfExpr.get.dataType // Force it to check input data types.
-        } else if 
(classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
-          udfExpr = Some(HiveUDAFFunction(name, new 
HiveFunctionWrapper(clazz.getName), input))
-          udfExpr.get.dataType // Force it to check input data types.
-        } else if (classOf[UDAF].isAssignableFrom(clazz)) {
-          udfExpr = Some(HiveUDAFFunction(
-            name,
-            new HiveFunctionWrapper(clazz.getName),
-            input,
-            isUDAFBridgeRequired = true))
-          udfExpr.get.dataType // Force it to check input data types.
-        } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
-          udfExpr = Some(HiveGenericUDTF(name, new 
HiveFunctionWrapper(clazz.getName), input))
-          udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it 
to check data types.
+    // Current thread context classloader may not be the one loaded the class. 
Need to switch
+    // context classloader to initialize instance properly.
+    Utils.withContextClassLoader(clazz.getClassLoader) {
+      Try(super.makeFunctionExpression(name, clazz, input)).getOrElse {
+        var udfExpr: Option[Expression] = None
+        try {
+          // When we instantiate hive UDF wrapper class, we may throw 
exception if the input
+          // expressions don't satisfy the hive UDF, such as type mismatch, 
input number
+          // mismatch, etc. Here we catch the exception and throw 
AnalysisException instead.
+          if (classOf[UDF].isAssignableFrom(clazz)) {
+            udfExpr = Some(HiveSimpleUDF(name, new 
HiveFunctionWrapper(clazz.getName), input))
+            udfExpr.get.dataType // Force it to check input data types.
 
 Review comment:
   The experimental UT code I used is below (added to SQLQuerySuite.scala) :
   
   ```
   test("SPARK-26560 ...experimenting Wenchen's comment...") {
       // force to use Spark classloader as other test (even in other test 
suites) may change the
       // current thread's context classloader to jar classloader
       Utils.withContextClassLoader(Utils.getSparkClassLoader) {
         withUserDefinedFunction("udtf_count3" -> false) {
           val sparkClassLoader = Thread.currentThread().getContextClassLoader
   
           // This jar file should not be placed to the classpath; 
GenericUDTFCount3 is slightly
           // modified version of GenericUDTFCount2 in hive/contrib, which 
emits the count for
           // three times.
           val jarPath = "src/test/noclasspath/TestUDTF-spark-26560.jar"
           val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
   
           val className = 
"org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3"
           sql(
             s"""
                |CREATE FUNCTION udtf_count3
                |AS '$className'
                |USING JAR '$jarURL'
             """.stripMargin)
   
           assert(Thread.currentThread().getContextClassLoader eq 
sparkClassLoader)
   
           // JAR will be loaded at first usage, and it will change the current 
thread's
           // context classloader to jar classloader in sharedState.
           // See SessionState.addJar for details.
           sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t")
   
           assert(Thread.currentThread().getContextClassLoader ne 
sparkClassLoader)
           assert(Thread.currentThread().getContextClassLoader eq
             spark.sqlContext.sharedState.jarClassLoader)
   
           // uses classloader which loads clazz
           val name = "default.udtf_count3"
   
           val input = Array(AttributeReference("a", IntegerType, nullable = 
false)())
           val udf = HiveGenericUDTF(name, new HiveFunctionWrapper(className), 
input)
           // FIXME: uncommenting below line will lead test passing
   //        udf.dataType
   
           // Roll back to the original classloader and run query again. 
Without this line, the test
           // would pass, as thread's context classloader is changed to jar 
classloader. But thread
           // context classloader can be changed from others as well which 
would fail the query; one
           // example is spark-shell, which thread context classloader rolls 
back automatically. This
           // mimics the behavior of spark-shell.
           Thread.currentThread().setContextClassLoader(sparkClassLoader)
   
           // FIXME: doing this "within" the context classloader which loads 
the UDF class will
           //   lead test passing even we comment out udf.dataType
           val newUdf = 
udf.makeCopy(udf.productIterator.map(_.asInstanceOf[AnyRef]).toArray)
   
           newUdf.dataType
         }
       }
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to