Could you please provide more context about what you are trying to do here?

On Thu, Dec 15, 2016 at 6:27 PM 李斌松 <[email protected]> wrote:

> How to reflect dynamic registration udf?
>
> java.lang.UnsupportedOperationException: Schema for type _$13 is not
> supported
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
> at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:145)
> at
> com.alibaba.spark.odps.driver.util.Utils$$anon$1.processMatch(Utils.scala:115)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec$1.lookForMatches(ScanSpec.java:759)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:446)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:368)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:59)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> final class sparkFunc(val name: String) extends StaticAnnotation{}
>
> def registerFunc(hiveContext: HiveContext): Unit = {
>     info("register udf function")
>
>     val ru = scala.reflect.runtime.universe
>     val classLoaderMirror = ru.runtimeMirror(getClass.getClassLoader)
>
>     new FastClasspathScanner("com.alibaba.spark.odps.driver.functions")
>         .matchAllClasses(new ClassMatchProcessor() {
>             override def processMatch(aClass: Class[_]): Unit = {
>                 val classMirror = classLoaderMirror.classSymbol(aClass)
>                 val annotation = classMirror.annotations.find(_.tpe =:= 
> ru.typeOf[sparkFunc]).getOrElse(null)
>
>                 try {
>                     if (annotation != null) {
>                         var funcName = 
> StringUtils.substringBetween(annotation.toString, "\"", "\"")
>
>                         if (chekClazz(aClass, classOf[Function1[_, _]])) {
>                             val func: Function1[_, _] = 
> createInstance[Function1[_, _]](aClass).get
>                             hiveContext.udf.register(funcName, func)
>                         } else if (chekClazz(aClass, classOf[Function2[_, _, 
> _]])) {
>                             val func: Function2[_, _, _] = 
> createInstance[Function2[_, _, _]](aClass).get
>                             hiveContext.udf.register(funcName, func)
>                         } else if (chekClazz(aClass, classOf[Function3[_, _, 
> _, _]])) {
>                             val func: Function3[_, _, _, _] = 
> createInstance[Function3[_, _, _, _]](aClass).get
>                             hiveContext.udf.register(funcName, func)
>                         } else {
>                             throw new RuntimeException("not support function")
>                         }
>
>                         info("== register function: {}", funcName)
>                     }
>                 } catch {
>                     case e: Exception => error(e.getMessage, e)
>                 }
>             }
>         }).scan()
> }
>
> private def chekClazz(sClass: Class[_], pClass: Class[_]): Boolean = {
>     try {
>         sClass.asSubclass(pClass)
>         true
>     } catch {
>         case e: Exception => false
>     }
> }
>
> private def createInstance[T: ClassTag](clazz: Class[_]): Try[T] = {
>     Try {
>         val constructor = clazz.getDeclaredConstructor()
>         constructor.setAccessible(true)
>         val obj = constructor.newInstance()
>         val t = implicitly[ClassTag[T]].runtimeClass
>         if (t.isInstance(obj)) {
>             obj.asInstanceOf[T]
>         } else throw new ClassCastException(clazz.getName + " is not a 
> subtype of " + t)
>     } recover {
>         case i: InvocationTargetException if i.getTargetException ne null ⇒ 
> throw i.getTargetException
>     }
> }
>
>

Reply via email to