Hi Dawid, Thank you for the response.
I create a remote StreamExecutionEnvironment, wrap a new TableExecutionEnvironment around it, build the job, then execute it via executeAsync(). It sounds like I cannot send the serialized job graph to the session cluster's JobManager via the web API. The above approach does work to submit a job, but only if I have my source bundled in a jar on the JobManager. I am building the job using the remote environment, and it gets to the point of initialization, but fails on the TaskManager nodes since the UDF cannot be located. It is surprising that the TaskManager is throwing the error loading my UDF class. It seems like the session cluster's JobManager must be forwarding my job graph (submitted to its web API) since it has no knowledge of the UDF class either. I will include the stacktrace at the bottom of this e-mail in case that is helpful. Is there a way I can capture and send the serialized job graph? Thanks again! Joel Stack trace: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.buzuli.example.MyUDF ClassLoader info: URL ClassLoader: Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:159) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.ClassNotFoundException: com.buzuli.example.MyUDF at java.base/java.net.URLClassLoader.findClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Unknown Source) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) at java.base/java.io.ObjectInputStream.readArray(Unknown Source) at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source) at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source) at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source) at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source) at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source) at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source) at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) at java.base/java.io.ObjectInputStream.readObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) ... 7 more On Tue, Sep 7, 2021 at 12:03 Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Joel, > > There is a few uncertainties in my reply so I am including Timo who should > be able to confirm or deny what I wrote. > > Generally speaking once a JobGraph is created it should contain the UDF. I > might be wrong here, but the UDF must be available while the JobGraph is > created. I believe you're submitting the job via the `flink run` command. > Is that right? Therefore the UDF must be available on the classpath of this > client, as that is the moment the JobGraph is create. If you use the web ui > to submit the job, then the JobGraph is created on the JobManager and it > must be available there. > > Best, > > Dawid > On 07/09/2021 16:31, Joel Edwards wrote: > > Good day, > > I have been attempting to submit a job to a session cluster. This job > involves a pair of dynamic tables and a SQL query. The SQL query is calling > a UDF which I register via the table API's createTemporarySystemFunction() > method. The job runs locally, but when I attempt to submit it to a remote > session cluster, the job fails with the error: > > `Cannot load user class: <fully-qualified-class-name>` > > If I place a fat jar containing all of my local dependencies on the > JobManagers and TaskManagers, the UDF will be loaded. However, I would > expect the UDF to be serialized and sent with the rest of the job. I have > looked over the UDF documentation, and I don't see a reason why it would > not be serialized with the rest of the job. However, seeing as there is no > error related to serializing the UDF, my assumptions related to UDF > serialization must be incorrect. Is there a hint I can use to cause the > closure cleaner to identify the UDF for serialization? I suspect the reason > it is not being included is that it is referenced only in the SQL query, > and not streams feeding the input table or the stream consuming the output > table. > > Summary of questions: > - Will UDF be serialized with the job? Or are they never included? > - Is it possible to hint at what should be serialized and sent along with > the job? > > Thank you, > Joel > > > -- > Joel Edwards > Software Architect > Ed-Craft Software Solutions > >