HeartSaVioR commented on a change in pull request #27025: [SPARK-26560][SQL] Spark should be able to run Hive UDF using jar regardless of current thread context classloader URL: https://github.com/apache/spark/pull/27025#discussion_r399617719
########## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala ########## @@ -66,49 +66,52 @@ private[sql] class HiveSessionCatalog( name: String, clazz: Class[_], input: Seq[Expression]): Expression = { - - Try(super.makeFunctionExpression(name, clazz, input)).getOrElse { - var udfExpr: Option[Expression] = None - try { - // When we instantiate hive UDF wrapper class, we may throw exception if the input - // expressions don't satisfy the hive UDF, such as type mismatch, input number - // mismatch, etc. Here we catch the exception and throw AnalysisException instead. - if (classOf[UDF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[GenericUDF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) { - udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[UDAF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveUDAFFunction( - name, - new HiveFunctionWrapper(clazz.getName), - input, - isUDAFBridgeRequired = true)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types. + // Current thread context classloader may not be the one loaded the class. Need to switch + // context classloader to initialize instance properly. + Utils.withContextClassLoader(clazz.getClassLoader) { + Try(super.makeFunctionExpression(name, clazz, input)).getOrElse { + var udfExpr: Option[Expression] = None + try { + // When we instantiate hive UDF wrapper class, we may throw exception if the input + // expressions don't satisfy the hive UDF, such as type mismatch, input number + // mismatch, etc. Here we catch the exception and throw AnalysisException instead. + if (classOf[UDF].isAssignableFrom(clazz)) { + udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.dataType // Force it to check input data types. Review comment: The experimental UT code I used is below (added to SQLQuerySuite.scala) : ``` test("SPARK-26560 ...experimenting Wenchen's comment...") { // force to use Spark classloader as other test (even in other test suites) may change the // current thread's context classloader to jar classloader Utils.withContextClassLoader(Utils.getSparkClassLoader) { withUserDefinedFunction("udtf_count3" -> false) { val sparkClassLoader = Thread.currentThread().getContextClassLoader // This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly // modified version of GenericUDTFCount2 in hive/contrib, which emits the count for // three times. val jarPath = "src/test/noclasspath/TestUDTF-spark-26560.jar" val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" val className = "org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3" sql( s""" |CREATE FUNCTION udtf_count3 |AS '$className' |USING JAR '$jarURL' """.stripMargin) assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader) // JAR will be loaded at first usage, and it will change the current thread's // context classloader to jar classloader in sharedState. // See SessionState.addJar for details. sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t") assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader) assert(Thread.currentThread().getContextClassLoader eq spark.sqlContext.sharedState.jarClassLoader) // uses classloader which loads clazz val name = "default.udtf_count3" val input = Array(AttributeReference("a", IntegerType, nullable = false)()) val udf = HiveGenericUDTF(name, new HiveFunctionWrapper(className), input) // FIXME: uncommenting below line will lead test passing // udf.dataType // Roll back to the original classloader and run query again. Without this line, the test // would pass, as thread's context classloader is changed to jar classloader. But thread // context classloader can be changed from others as well which would fail the query; one // example is spark-shell, which thread context classloader rolls back automatically. This // mimics the behavior of spark-shell. Thread.currentThread().setContextClassLoader(sparkClassLoader) // FIXME: doing this "within" the context classloader which loads the UDF class will // lead test passing even we comment out udf.dataType val newUdf = udf.makeCopy(udf.productIterator.map(_.asInstanceOf[AnyRef]).toArray) newUdf.dataType } } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org