Hi Dawid,
According to your suggestion, given that a I spawn a LocalEnvironment, I tried
the following:
val root = new File("custom")
val classLoader: URLClassLoader = new
URLClassLoader(Array[URL](root.toURI.toURL),
Thread.currentThread().getContextClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
val ignored = TemporaryClassLoaderContext.of(classLoader)
try {
fsTableEnv.createTemporaryFunction("myFunction", udf)
fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
}
unfortunately this still results in a ClassNotFoundException when executing the
environment. (The class is located outside of the classpath and is loaded
succesfully, instances of it behave as expected)
Did I possibly missunderstand what you were proposing?
Kind regards,
Jakub
________________________________
Von: Dawid Wysakowicz
Gesendet: Donnerstag, 10. Dezember 2020 09:59
Bis: Guowei Ma; Jakub N
Cc: [email protected]
Betreff: Re: Flink UDF registration from jar at runtime
Hi Jakub,
As Guowei said the UDF must be present in the user classloader. It must be
there when compiling the program and when executing on the cluster. As of now
the TableEnvironment uses the Thread context classloader as the "user
classloader" when compiling the query. Therefore you can do the trick via:
ClassLoader yourClassloader = ... // create your classloader with the UDF
try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(yourClassloader)) {
fsTableEnv.createTemporaryFunction("myFunction", udf)
fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
}
Take a look at the TemporaryClassLoaderContext[1] for a nice way how to do it
with a cleanup at the end.
To solve the second problem of having the UDF on the classpath when executing.
If you are just spawning a LocalEnvironment the above should do the trick as it
will use the context classloader. If you are submitting to a cluster, you can
submit multiple jars as part of a single job either via the RemoteEnvironment
or the flink run command.
That's how we submit UDFs from separate jars in the sql-client. You can try to
go through a few classes there and see how it is done. I am afraid it's not the
easiest task as there are quite a few classes to navigate through. You could
start from e.g.
org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2]
[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java
[2]
https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305
On 10/12/2020 09:15, Guowei Ma wrote:
Hi, Jakub
If I understand correctly you want the job, which you submitted, could load
some table function which does not in the job jar.
I don't think Flink could support this natively.(Maybe other guys know).
But I think this requirement is like some code generated. You need to submit
the "code" to the job. I think you could refer to the [1].
[1]
https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java#L29
Best,
Guowei
On Tue, Dec 8, 2020 at 8:40 PM Jakub N
<[email protected]<mailto:[email protected]>> wrote:
Hi Guowei,
1. Unfortunately the UDF and the job are not in the same fatjar. Essentially
there is only one "fatjar" containing the Flink environment + the job, the UDF
is separate.
2. Yes, that is correct.
3. As explained in 1. I don't submit job jars to the Flink environment,
instead the job is created and submitted within the "fatjar"
Codewise nothing changed except for where the location of the UDF was specified.
"Submitting to the environment" works as follows:
1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
2. (Register UDF's)
3. Create tables
4. Query on the tables
5. Execute the environment
The overall process is executed as one program.
Apologies if any of these explanations are unclear or too vague.
Kind regards,
Jakub
________________________________
Von: Guowei Ma <[email protected]<mailto:[email protected]>>
Gesendet: Dienstag, 8. Dezember 2020 06:34
An: Jakub N <[email protected]<mailto:[email protected]>>
Cc: [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Betreff: Re: Flink UDF registration from jar at runtime
Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following
(correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar"
2. You "new" a UDF object in the job().
3. You submit the "fatjar" to the local Flink environment.
In theory there should not be any problem. Could share how you change the code
and how you submit your job to the local environment.
Best,
Guowei
On Tue, Dec 8, 2020 at 2:53 AM Jakub N
<[email protected]<mailto:[email protected]>> wrote:
Hi Guowei,
It turned out for my application I unfortunately can't have the UDF in the
"job's" classpath. As I am using a local Flink environment and `sbt pack`
(similar to a fatjar) to create launch scripts therefore, to my understanding,
I can't access the classpath (when the project is packed).
Are there any ways to add these UDF's from outside the classpath?
Kind regards,
Jakub
________________________________
Von: Jakub N <[email protected]<mailto:[email protected]>>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[email protected]<mailto:[email protected]>>
Cc: [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Betreff: Re: Flink UDF registration from jar at runtime
Hi Guowei,
Great thanks for your help. Your suggestion indeed solved the issue. I moved
`myFunction` to the class path where execution starts.
Kind regards,
Jakub
________________________________
Von: Guowei Ma <[email protected]<mailto:[email protected]>>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[email protected]<mailto:[email protected]>>
Cc: [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Betreff: Re: Flink UDF registration from jar at runtime
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the
thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei
On Mon, Dec 7, 2020 at 5:54 PM Jakub N
<[email protected]<mailto:[email protected]>> wrote:
Hi Guowei,
Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val fsEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()
val fsTableEnv: StreamTableEnvironment =
StreamTableEnvironment.create(fsEnv, fsSettings)
val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new
File("target/custom/myFunction.java").getPath)
val root = new File("target/custom")
val classLoader: URLClassLoader = new
URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)
//creating Table...
fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()
myFunction.java
import org.apache.flink.table.functions.ScalarFunction;
public class myFunction extends ScalarFunction {
public String eval(String s) {
return "myFunction - " + s;
}
}
Execution works as follows: A QueryCommand instance is created, some properties
are being set, execute() will be invoked
Let me know if any other relevant information is missing, alternatively you can
also have a look at the source code here
(https://github.com/codefeedr/kafkaquery).
Kind regards,
Jakub
________________________________
Von: Guowei Ma <[email protected]<mailto:[email protected]>>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[email protected]<mailto:[email protected]>>
Cc: [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Betreff: Re: Flink UDF registration from jar at runtime
Hi, Jakub
In theory there should not be any problem because you could register the
function object.
So would you like to share your code and the shell command that you submit your
job?
Best,
Guowei
On Mon, Dec 7, 2020 at 3:19 AM Jakub N
<[email protected]<mailto:[email protected]>> wrote:
The current setup is: Data in Kafka -> Kafka Connector ->
StreamTableEnvironment -> execute Flink SQL queries
I would like to register Flink's User-defined Functions from a jar or java
class file during runtime. What I have tried so far is using Java's Classloader
getting an instance of a ScalarFunction (UDF) and registering it in the
StreamTableEnvironment. When I try executing a query making use of the UDF I
get the following exception:
Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...
I have verified that the generated instance of the UDF behaves as expected when
invoking any of its methods.
Do you have any ideas on why this is failing?