I'm runnin the code from Eclipse, the jar exists and it contains the
classes Flink is not finding..maybe I can try to use IntelliJ in the
afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <ches...@apache.org>
wrote:

> @Kostas: Ah, I missed that.
>
> @Flavio: the only alternative I can think your jar does not contain the
> classes, or does not exist at all on the machine your application is run
> on.
>
> On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> > Hi all,
> >
> > I will have a look in the whole stack trace in a bit.
> >
> > @Chesnay Schepler I think that we are setting the correct classloader
> > during jobgraph creation [1]. Is that what you mean?
> >
> > Cheers,
> > Kostas
> >
> > [1]
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
> >
> > On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> > <pomperma...@okkam.it> wrote:
> >> Always the same problem.
> >>
> >> Caused by: java.lang.ClassNotFoundException: it.test.XXX
> >> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
> >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> >> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> >> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> >> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> >> ... 10 more
> >>
> >> I've also tried with
> >>
> >>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
> "parent-first");
> >>
> >> but nothing changes.
> >>
> >> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <ches...@apache.org>
> wrote:
> >>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some
> things outside the usercode classlodaer (getPipelineFromProgram()),
> specifically the call to the main method.
> >>>
> >>> @klou This seems like wrong behavior?
> >>>
> >>> @Flavio What you could try in the meantime is wrap the call to
> createJobGraph like this:
> >>>
> >>> final ClassLoader contextClassLoader =
> Thread.currentThread().getContextClassLoader();
> >>> try {
> >>>
>  
> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
> >>>     // do tstuff
> >>> } finally {
> >>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
> >>> }
> >>>
> >>>
> >>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
> >>>
> >>> Any help here?  How can I understand why the classes inside the jar
> are not found when creating the PackagedProgram?
> >>>
> >>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
> >>>> In the logs I see that the jar is the classpath (I'm trying to debug
> the program from the IDE)..isn'it?
> >>>>
> >>>> Classpath: [file:/tmp/job-bundle.jar]
> >>>> ...
> >>>>
> >>>> Best,
> >>>> Flavio
> >>>>
> >>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <ches...@apache.org>
> wrote:
> >>>>> * your JobExecutor is _not_ putting it on the classpath.
> >>>>>
> >>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
> >>>>>
> >>>>> Well it happens on the client before you even hit the
> RestClusterClient, so I assume that either your jar is not packaged
> correctly or you your JobExecutor is putting it on the classpath.
> >>>>>
> >>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
> >>>>>
> >>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main
> class I'm trying to use as a client towards the Flink cluster - session
> mode).
> >>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
> >>>>>
> >>>>> The code of getBatchEnv is:
> >>>>>
> >>>>> @Deprecated
> >>>>>    public static BatchEnv getBatchEnv() {
> >>>>>      // TODO use the following when ready to convert from/to
> datastream
> >>>>>      // return
> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
> >>>>>      ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> >>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
> >>>>>      customizeEnv(ret);
> >>>>>      return new BatchEnv(env, ret);
> >>>>>    }
> >>>>>
> >>>>>    private static void customizeEnv(TableEnvironment ret) {
> >>>>>      final Configuration conf = ret.getConfig().getConfiguration();
> >>>>>      //
> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
> 2);
> >>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
> >>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
> FLINK_TEST_TMP_DIR);
> >>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
> 4); //NOSONAR
> >>>>>      //
> conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
> 0.4f);//NOSONAR
> >>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
> 32768 * 2);//NOSONAR
> >>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
> 32768 * 2);// NOSONAR
> >>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
> 0);// NOSONAR
> >>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
> >>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
> >>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
> >>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT,
> Duration.ofMinutes(10));// NOSONAR
> >>>>>      final List<String> kryoSerializers = new ArrayList<>();
> >>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
> JodaDateTimeSerializer.class));
> >>>>>
> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
> TBaseSerializer.class));
> >>>>>
> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
> TBaseSerializer.class));
> >>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
> kryoSerializers);
> >>>>>
> >>>>>    }
> >>>>>
> >>>>> Classpath: [file:/tmp/job-bundle.jar]
> >>>>>
> >>>>> System.out: (none)
> >>>>>
> >>>>> System.err: (none)
> >>>>> at
> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
> >>>>> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
> >>>>> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
> >>>>> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
> >>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
> >>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
> >>>>> at
> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
> >>>>> at
> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
> >>>>> at it.okkam.datalinks.flink.jobs
> .EnsReconciliator.main(EnsReconciliator.java:73)
> >>>>> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> >>>>> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>>>> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> >>>>> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >>>>> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >>>>> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
> >>>>> ... 3 more
> >>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
> >>>>> at java.base/java.net
> .URLClassLoader.findClass(URLClassLoader.java:471)
> >>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> >>>>> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> >>>>> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> >>>>> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> >>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> >>>>> ... 13 more
> >>>>>
> >>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org>
> wrote:
> >>>>>> Hi Flavio,
> >>>>>> can you share the full stacktrace you are seeing? I'm wondering if
> the error happens on the client or server side (among other questions I
> have).
> >>>>>>
> >>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
> >>>>>>> Hi to all,
> >>>>>>> I was trying to use the RestClusterClient to submit my job to the
> Flink cluster.
> >>>>>>> However when I submit the job Flink cannot find the classes
> contained in the "fat" jar..what should I do? Am I missing something in my
> code?
> >>>>>>> This is the current client code I'm testing:
> >>>>>>>
> >>>>>>> public static void main(String[] args) throws
> MalformedURLException {
> >>>>>>>      final Configuration flinkConf = new Configuration();
> >>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
> >>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
> >>>>>>>
> >>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
> >>>>>>>      final String jobClass = "it.flink.MyJob";
> >>>>>>>
> >>>>>>>      try {
> >>>>>>>        final RestClusterClient<StandaloneClusterId> client =
> >>>>>>>            new RestClusterClient<>(flinkConf,
> StandaloneClusterId.getInstance());
> >>>>>>>
> >>>>>>>        final PackagedProgram packagedProgram =
> PackagedProgram.newBuilder()//
> >>>>>>>            .setJarFile(jarFile)//
> >>>>>>>            // .setUserClassPaths(userClassPaths)
> >>>>>>>
> .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
> >>>>>>>            .build();
> >>>>>>>
> >>>>>>>        final JobGraph jobGraph =
> >>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram,
> flinkConf, 1, true);
> >>>>>>>
> >>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
> >>>>>>>
> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
> >>>>>>>
> >>>>>>>        System.out.println(jobExecutionResult.getJobID());
> >>>>>>>      } catch (Exception ex) {
> >>>>>>>        ex.printStackTrace();
> >>>>>>>        System.exit(1);
> >>>>>>>      }
> >>>>>>> }
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Flavio
> >>>>>
> >>>>>
>

Reply via email to