Hi all,

I will have a look in the whole stack trace in a bit.

@Chesnay Schepler I think that we are setting the correct classloader
during jobgraph creation [1]. Is that what you mean?

Cheers,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122

On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
<pomperma...@okkam.it> wrote:
>
> Always the same problem.
>
> Caused by: java.lang.ClassNotFoundException: it.test.XXX
> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> ... 10 more
>
> I've also tried with
>
>     flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>
> but nothing changes.
>
> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <ches...@apache.org> wrote:
>>
>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things 
>> outside the usercode classlodaer (getPipelineFromProgram()), specifically 
>> the call to the main method.
>>
>> @klou This seems like wrong behavior?
>>
>> @Flavio What you could try in the meantime is wrap the call to 
>> createJobGraph like this:
>>
>> final ClassLoader contextClassLoader = 
>> Thread.currentThread().getContextClassLoader();
>> try {
>>    
>> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>    // do tstuff
>> } finally {
>>    Thread.currentThread().setContextClassLoader(contextClassLoader);
>> }
>>
>>
>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>
>> Any help here?  How can I understand why the classes inside the jar are not 
>> found when creating the PackagedProgram?
>>
>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <pomperma...@okkam.it> 
>> wrote:
>>>
>>> In the logs I see that the jar is the classpath (I'm trying to debug the 
>>> program from the IDE)..isn'it?
>>>
>>> Classpath: [file:/tmp/job-bundle.jar]
>>> ...
>>>
>>> Best,
>>> Flavio
>>>
>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <ches...@apache.org> 
>>> wrote:
>>>>
>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>
>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>
>>>> Well it happens on the client before you even hit the RestClusterClient, 
>>>> so I assume that either your jar is not packaged correctly or you your 
>>>> JobExecutor is putting it on the classpath.
>>>>
>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>
>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class 
>>>> I'm trying to use as a client towards the Flink cluster - session mode).
>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>
>>>> The code of getBatchEnv is:
>>>>
>>>> @Deprecated
>>>>   public static BatchEnv getBatchEnv() {
>>>>     // TODO use the following when ready to convert from/to datastream
>>>>     // return 
>>>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>     ExecutionEnvironment env = 
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>     BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>     customizeEnv(ret);
>>>>     return new BatchEnv(env, ret);
>>>>   }
>>>>
>>>>   private static void customizeEnv(TableEnvironment ret) {
>>>>     final Configuration conf = ret.getConfig().getConfiguration();
>>>>     // 
>>>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
>>>>  2);
>>>>     conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>     conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
>>>> FLINK_TEST_TMP_DIR);
>>>>     // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); 
>>>> //NOSONAR
>>>>     // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
>>>> 0.4f);//NOSONAR
>>>>     // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 
>>>> 2);//NOSONAR
>>>>     // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 
>>>> 2);// NOSONAR
>>>>     conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// 
>>>> NOSONAR
>>>>     conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>     conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>     conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>     conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// 
>>>> NOSONAR
>>>>     final List<String> kryoSerializers = new ArrayList<>();
>>>>     kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
>>>> JodaDateTimeSerializer.class));
>>>>     kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
>>>> TBaseSerializer.class));
>>>>     kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, 
>>>> TBaseSerializer.class));
>>>>     conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>
>>>>   }
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>
>>>> System.out: (none)
>>>>
>>>> System.err: (none)
>>>> at 
>>>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>> at 
>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>> at 
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>> at 
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>> at 
>>>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>> at 
>>>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>> at 
>>>> it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>>>> Method)
>>>> at 
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at 
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>> at 
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>> at 
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>> at 
>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>> ... 3 more
>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>> at 
>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>> at 
>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>> at 
>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>> ... 13 more
>>>>
>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org> wrote:
>>>>>
>>>>> Hi Flavio,
>>>>> can you share the full stacktrace you are seeing? I'm wondering if the 
>>>>> error happens on the client or server side (among other questions I have).
>>>>>
>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <pomperma...@okkam.it> 
>>>>> wrote:
>>>>>>
>>>>>> Hi to all,
>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink 
>>>>>> cluster.
>>>>>> However when I submit the job Flink cannot find the classes contained in 
>>>>>> the "fat" jar..what should I do? Am I missing something in my code?
>>>>>> This is the current client code I'm testing:
>>>>>>
>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>     final Configuration flinkConf = new Configuration();
>>>>>>     flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>     flinkConf.set(RestOptions.PORT, 8081);
>>>>>>
>>>>>>     final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>     final String jobClass = "it.flink.MyJob";
>>>>>>
>>>>>>     try {
>>>>>>       final RestClusterClient<StandaloneClusterId> client =
>>>>>>           new RestClusterClient<>(flinkConf, 
>>>>>> StandaloneClusterId.getInstance());
>>>>>>
>>>>>>       final PackagedProgram packagedProgram = 
>>>>>> PackagedProgram.newBuilder()//
>>>>>>           .setJarFile(jarFile)//
>>>>>>           // .setUserClassPaths(userClassPaths)
>>>>>>           .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>           .build();
>>>>>>
>>>>>>       final JobGraph jobGraph =
>>>>>>           PackagedProgramUtils.createJobGraph(packagedProgram, 
>>>>>> flinkConf, 1, true);
>>>>>>
>>>>>>       final DetachedJobExecutionResult jobExecutionResult =
>>>>>>           
>>>>>> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>
>>>>>>       System.out.println(jobExecutionResult.getJobID());
>>>>>>     } catch (Exception ex) {
>>>>>>       ex.printStackTrace();
>>>>>>       System.exit(1);
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>
>>>>
>>>>
>>

Reply via email to