I'm runnin the code from Eclipse, the jar exists and it contains the classes Flink is not finding..maybe I can try to use IntelliJ in the afternoon
On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <ches...@apache.org> wrote: > @Kostas: Ah, I missed that. > > @Flavio: the only alternative I can think your jar does not contain the > classes, or does not exist at all on the machine your application is run > on. > > On 10/28/2020 12:08 PM, Kostas Kloudas wrote: > > Hi all, > > > > I will have a look in the whole stack trace in a bit. > > > > @Chesnay Schepler I think that we are setting the correct classloader > > during jobgraph creation [1]. Is that what you mean? > > > > Cheers, > > Kostas > > > > [1] > https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122 > > > > On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier > > <pomperma...@okkam.it> wrote: > >> Always the same problem. > >> > >> Caused by: java.lang.ClassNotFoundException: it.test.XXX > >> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) > >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) > >> at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) > >> at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) > >> at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > >> ... 10 more > >> > >> I've also tried with > >> > >> flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, > "parent-first"); > >> > >> but nothing changes. > >> > >> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <ches...@apache.org> > wrote: > >>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some > things outside the usercode classlodaer (getPipelineFromProgram()), > specifically the call to the main method. > >>> > >>> @klou This seems like wrong behavior? > >>> > >>> @Flavio What you could try in the meantime is wrap the call to > createJobGraph like this: > >>> > >>> final ClassLoader contextClassLoader = > Thread.currentThread().getContextClassLoader(); > >>> try { > >>> > > Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader()); > >>> // do tstuff > >>> } finally { > >>> Thread.currentThread().setContextClassLoader(contextClassLoader); > >>> } > >>> > >>> > >>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote: > >>> > >>> Any help here? How can I understand why the classes inside the jar > are not found when creating the PackagedProgram? > >>> > >>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier < > pomperma...@okkam.it> wrote: > >>>> In the logs I see that the jar is the classpath (I'm trying to debug > the program from the IDE)..isn'it? > >>>> > >>>> Classpath: [file:/tmp/job-bundle.jar] > >>>> ... > >>>> > >>>> Best, > >>>> Flavio > >>>> > >>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <ches...@apache.org> > wrote: > >>>>> * your JobExecutor is _not_ putting it on the classpath. > >>>>> > >>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote: > >>>>> > >>>>> Well it happens on the client before you even hit the > RestClusterClient, so I assume that either your jar is not packaged > correctly or you your JobExecutor is putting it on the classpath. > >>>>> > >>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote: > >>>>> > >>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main > class I'm trying to use as a client towards the Flink cluster - session > mode). > >>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar). > >>>>> > >>>>> The code of getBatchEnv is: > >>>>> > >>>>> @Deprecated > >>>>> public static BatchEnv getBatchEnv() { > >>>>> // TODO use the following when ready to convert from/to > datastream > >>>>> // return > getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build()); > >>>>> ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > >>>>> BatchTableEnvironment ret = BatchTableEnvironment.create(env); > >>>>> customizeEnv(ret); > >>>>> return new BatchEnv(env, ret); > >>>>> } > >>>>> > >>>>> private static void customizeEnv(TableEnvironment ret) { > >>>>> final Configuration conf = ret.getConfig().getConfiguration(); > >>>>> // > conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, > 2); > >>>>> conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR); > >>>>> conf.setString(BlobServerOptions.STORAGE_DIRECTORY, > FLINK_TEST_TMP_DIR); > >>>>> // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, > 4); //NOSONAR > >>>>> // > conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, > 0.4f);//NOSONAR > >>>>> // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, > 32768 * 2);//NOSONAR > >>>>> // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, > 32768 * 2);// NOSONAR > >>>>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, > 0);// NOSONAR > >>>>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR > >>>>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR > >>>>> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR > >>>>> conf.set(ClientOptions.CLIENT_TIMEOUT, > Duration.ofMinutes(10));// NOSONAR > >>>>> final List<String> kryoSerializers = new ArrayList<>(); > >>>>> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, > JodaDateTimeSerializer.class)); > >>>>> > kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, > TBaseSerializer.class)); > >>>>> > kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, > TBaseSerializer.class)); > >>>>> conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, > kryoSerializers); > >>>>> > >>>>> } > >>>>> > >>>>> Classpath: [file:/tmp/job-bundle.jar] > >>>>> > >>>>> System.out: (none) > >>>>> > >>>>> System.err: (none) > >>>>> at > org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245) > >>>>> at > org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164) > >>>>> at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77) > >>>>> at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109) > >>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42) > >>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb > >>>>> at > it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116) > >>>>> at > it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95) > >>>>> at it.okkam.datalinks.flink.jobs > .EnsReconciliator.main(EnsReconciliator.java:73) > >>>>> at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > >>>>> at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >>>>> at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) > >>>>> at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > >>>>> at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >>>>> at > org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150) > >>>>> ... 3 more > >>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb > >>>>> at java.base/java.net > .URLClassLoader.findClass(URLClassLoader.java:471) > >>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) > >>>>> at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) > >>>>> at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) > >>>>> at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > >>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > >>>>> ... 13 more > >>>>> > >>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org> > wrote: > >>>>>> Hi Flavio, > >>>>>> can you share the full stacktrace you are seeing? I'm wondering if > the error happens on the client or server side (among other questions I > have). > >>>>>> > >>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier < > pomperma...@okkam.it> wrote: > >>>>>>> Hi to all, > >>>>>>> I was trying to use the RestClusterClient to submit my job to the > Flink cluster. > >>>>>>> However when I submit the job Flink cannot find the classes > contained in the "fat" jar..what should I do? Am I missing something in my > code? > >>>>>>> This is the current client code I'm testing: > >>>>>>> > >>>>>>> public static void main(String[] args) throws > MalformedURLException { > >>>>>>> final Configuration flinkConf = new Configuration(); > >>>>>>> flinkConf.set(RestOptions.ADDRESS, "localhost"); > >>>>>>> flinkConf.set(RestOptions.PORT, 8081); > >>>>>>> > >>>>>>> final File jarFile = new File("/tmp/job-bundle.jar"); > >>>>>>> final String jobClass = "it.flink.MyJob"; > >>>>>>> > >>>>>>> try { > >>>>>>> final RestClusterClient<StandaloneClusterId> client = > >>>>>>> new RestClusterClient<>(flinkConf, > StandaloneClusterId.getInstance()); > >>>>>>> > >>>>>>> final PackagedProgram packagedProgram = > PackagedProgram.newBuilder()// > >>>>>>> .setJarFile(jarFile)// > >>>>>>> // .setUserClassPaths(userClassPaths) > >>>>>>> > .setEntryPointClassName(jobClass).setConfiguration(flinkConf)// > >>>>>>> .build(); > >>>>>>> > >>>>>>> final JobGraph jobGraph = > >>>>>>> PackagedProgramUtils.createJobGraph(packagedProgram, > flinkConf, 1, true); > >>>>>>> > >>>>>>> final DetachedJobExecutionResult jobExecutionResult = > >>>>>>> > client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get(); > >>>>>>> > >>>>>>> System.out.println(jobExecutionResult.getJobID()); > >>>>>>> } catch (Exception ex) { > >>>>>>> ex.printStackTrace(); > >>>>>>> System.exit(1); > >>>>>>> } > >>>>>>> } > >>>>>>> > >>>>>>> Best, > >>>>>>> Flavio > >>>>> > >>>>> >