Hi Guowei,
Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val fsEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()
val fsTableEnv: StreamTableEnvironment =
StreamTableEnvironment.create(fsEnv, fsSettings)
val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new
File("target/custom/myFunction.java").getPath)
val root = new File("target/custom")
val classLoader: URLClassLoader = new
URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)
//creating Table...
fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()
myFunction.java
import org.apache.flink.table.functions.ScalarFunction;
public class myFunction extends ScalarFunction {
public String eval(String s) {
return "myFunction - " + s;
}
}
Execution works as follows: A QueryCommand instance is created, some properties
are being set, execute() will be invoked
Let me know if any other relevant information is missing, alternatively you can
also have a look at the source code here
(https://github.com/codefeedr/kafkaquery).
Kind regards,
Jakub
________________________________
Von: Guowei Ma <[email protected]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[email protected]>
Cc: [email protected] <[email protected]>
Betreff: Re: Flink UDF registration from jar at runtime
Hi, Jakub
In theory there should not be any problem because you could register the
function object.
So would you like to share your code and the shell command that you submit your
job?
Best,
Guowei
On Mon, Dec 7, 2020 at 3:19 AM Jakub N
<[email protected]<mailto:[email protected]>> wrote:
The current setup is: Data in Kafka -> Kafka Connector ->
StreamTableEnvironment -> execute Flink SQL queries
I would like to register Flink's User-defined Functions from a jar or java
class file during runtime. What I have tried so far is using Java's Classloader
getting an instance of a ScalarFunction (UDF) and registering it in the
StreamTableEnvironment. When I try executing a query making use of the UDF I
get the following exception:
Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...
I have verified that the generated instance of the UDF behaves as expected when
invoking any of its methods.
Do you have any ideas on why this is failing?