Could you please provide more context about what you are trying to do here?
On Thu, Dec 15, 2016 at 6:27 PM 李斌松 <[email protected]> wrote: > How to reflect dynamic registration udf? > > java.lang.UnsupportedOperationException: Schema for type _$13 is not > supported > at > org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153) > at > org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) > at > org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64) > at > org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) > at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:145) > at > com.alibaba.spark.odps.driver.util.Utils$$anon$1.processMatch(Utils.scala:115) > at > io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec$1.lookForMatches(ScanSpec.java:759) > at > io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:446) > at > io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:368) > at > io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:59) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > final class sparkFunc(val name: String) extends StaticAnnotation{} > > def registerFunc(hiveContext: HiveContext): Unit = { > info("register udf function") > > val ru = scala.reflect.runtime.universe > val classLoaderMirror = ru.runtimeMirror(getClass.getClassLoader) > > new FastClasspathScanner("com.alibaba.spark.odps.driver.functions") > .matchAllClasses(new ClassMatchProcessor() { > override def processMatch(aClass: Class[_]): Unit = { > val classMirror = classLoaderMirror.classSymbol(aClass) > val annotation = classMirror.annotations.find(_.tpe =:= > ru.typeOf[sparkFunc]).getOrElse(null) > > try { > if (annotation != null) { > var funcName = > StringUtils.substringBetween(annotation.toString, "\"", "\"") > > if (chekClazz(aClass, classOf[Function1[_, _]])) { > val func: Function1[_, _] = > createInstance[Function1[_, _]](aClass).get > hiveContext.udf.register(funcName, func) > } else if (chekClazz(aClass, classOf[Function2[_, _, > _]])) { > val func: Function2[_, _, _] = > createInstance[Function2[_, _, _]](aClass).get > hiveContext.udf.register(funcName, func) > } else if (chekClazz(aClass, classOf[Function3[_, _, > _, _]])) { > val func: Function3[_, _, _, _] = > createInstance[Function3[_, _, _, _]](aClass).get > hiveContext.udf.register(funcName, func) > } else { > throw new RuntimeException("not support function") > } > > info("== register function: {}", funcName) > } > } catch { > case e: Exception => error(e.getMessage, e) > } > } > }).scan() > } > > private def chekClazz(sClass: Class[_], pClass: Class[_]): Boolean = { > try { > sClass.asSubclass(pClass) > true > } catch { > case e: Exception => false > } > } > > private def createInstance[T: ClassTag](clazz: Class[_]): Try[T] = { > Try { > val constructor = clazz.getDeclaredConstructor() > constructor.setAccessible(true) > val obj = constructor.newInstance() > val t = implicitly[ClassTag[T]].runtimeClass > if (t.isInstance(obj)) { > obj.asInstanceOf[T] > } else throw new ClassCastException(clazz.getName + " is not a > subtype of " + t) > } recover { > case i: InvocationTargetException if i.getTargetException ne null ⇒ > throw i.getTargetException > } > } > >
