How to reflect dynamic registration udf?
java.lang.UnsupportedOperationException: Schema for type _$13 is not
supported
at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:145)
at
com.alibaba.spark.odps.driver.util.Utils$$anon$1.processMatch(Utils.scala:115)
at
io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec$1.lookForMatches(ScanSpec.java:759)
at
io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:446)
at
io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:368)
at
io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:59)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
final class sparkFunc(val name: String) extends StaticAnnotation{}
def registerFunc(hiveContext: HiveContext): Unit = {
info("register udf function")
val ru = scala.reflect.runtime.universe
val classLoaderMirror = ru.runtimeMirror(getClass.getClassLoader)
new FastClasspathScanner("com.alibaba.spark.odps.driver.functions")
.matchAllClasses(new ClassMatchProcessor() {
override def processMatch(aClass: Class[_]): Unit = {
val classMirror = classLoaderMirror.classSymbol(aClass)
val annotation = classMirror.annotations.find(_.tpe
=:= ru.typeOf[sparkFunc]).getOrElse(null)
try {
if (annotation != null) {
var funcName =
StringUtils.substringBetween(annotation.toString, "\"", "\"")
if (chekClazz(aClass, classOf[Function1[_, _]])) {
val func: Function1[_, _] =
createInstance[Function1[_, _]](aClass).get
hiveContext.udf.register(funcName, func)
} else if (chekClazz(aClass,
classOf[Function2[_, _, _]])) {
val func: Function2[_, _, _] =
createInstance[Function2[_, _, _]](aClass).get
hiveContext.udf.register(funcName, func)
} else if (chekClazz(aClass,
classOf[Function3[_, _, _, _]])) {
val func: Function3[_, _, _, _] =
createInstance[Function3[_, _, _, _]](aClass).get
hiveContext.udf.register(funcName, func)
} else {
throw new RuntimeException("not support function")
}
info("== register function: {}", funcName)
}
} catch {
case e: Exception => error(e.getMessage, e)
}
}
}).scan()
}
private def chekClazz(sClass: Class[_], pClass: Class[_]): Boolean = {
try {
sClass.asSubclass(pClass)
true
} catch {
case e: Exception => false
}
}
private def createInstance[T: ClassTag](clazz: Class[_]): Try[T] = {
Try {
val constructor = clazz.getDeclaredConstructor()
constructor.setAccessible(true)
val obj = constructor.newInstance()
val t = implicitly[ClassTag[T]].runtimeClass
if (t.isInstance(obj)) {
obj.asInstanceOf[T]
} else throw new ClassCastException(clazz.getName + " is not a
subtype of " + t)
} recover {
case i: InvocationTargetException if i.getTargetException ne
null ⇒ throw i.getTargetException
}
}