I just copied my response because my other email address is not accepted on
the user mailing list.

Hi Matt,

I think Stefan's analysis is correct. I have a PR open [1], where I fix the
issue with the class loader.

As a side note, by doing what you're doing, you will spawn on each Ignite
node a new Flink mini cluster. These mini cluster won't communicate with
each other and run independently. Is this what you intend to do?

[1] https://github.com/apache/flink/pull/3781

Cheers,
Till

On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote:

> Let's wait for Till then, I hope he can figure this out.
>
> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Ok, now the question is also about what classloaders Ignite is creating
>> and how they are used, but the relevant code line in Flink is probably in
>> FlinkMiniCluster.scala, line 538 (current master):
>>
>>  try {
>>  JobClient.submitJobAndWait(
>>    clientActorSystem,
>>    configuration,
>>    leaderRetrievalService,
>>    jobGraph,
>>    timeout,
>>    printUpdates,
>>    this.getClass.getClassLoader())
>> } finally {
>>    if(!useSingleActorSystem) {
>>      // we have to shutdown the just created actor system
>>      shutdownJobClientActorSystem(clientActorSystem)
>>    }
>>  }
>>
>>
>> This is what is executed as part of executing a job through
>> LocalEnvironment. As we can see, the classloader is set to the classloader
>> of FlinkMiniCluster. Depending on the classloader structure inside Ignite,
>> this classloader might not know your user code. What you could do is
>> changing this line in a custom Flink build, changing line 538 for example
>> to Thread.currentThread().getContextClassloader() and ensuring that the
>> context classloader ins the runnable is a classloader that a) knows the
>> user code and b) is a child of the classloader that knows the Ignite and
>> Flink classes. Notice that this is not a general solution and should not
>> become a general fix.
>>
>> I have heard that Till is about to change some things about local
>> execution, so I included him in CC. Maybe he can provide additional hints
>> how your use case might be better supported in the upcoming Flink 1.3.
>>
>> Best,
>> Stefan
>>
>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>:
>>
>> I updated the code a little bit for clarity, now the line #56 mentioned
>> in my previous message is line #25.
>>
>> In summary the error I'm getting is this:
>>
>> ---
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>> Cannot load user class: com.test.Test
>> ClassLoader info: URL ClassLoader:
>> Class not resolvable through given classloader.
>> ---
>>
>> But if I'm not wrong, after trying to load the class through
>> URLClassLoader, Flink should try loading it with its parent ClassLoader,
>> which should be the same ClassLoader that executed the environment, and it
>> does have access to the class.
>>
>> Not sure what is wrong.
>>
>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote:
>>
>>> Hi Stefan,
>>>
>>> Check the code here: https://gist.github.com/
>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the
>>> page.
>>>
>>> Here are the results of the additional tests you mentioned:
>>>
>>> 1. I was able to instantiate an inner class (Test$Foo) inside the Ignite
>>> closure, no problem with that
>>> 2. I tried implementing SourceFunction and SinkFunction in Test itself,
>>> I was able to instantiate the class inside the Ignite closure
>>> 3. I'm not sure what you meant in this point, is it something like what
>>> I tried in line #56?
>>>
>>> Additionally, I tried implementing the SourceFunction and SinkFunction
>>> in Test$Foo with the same result: it says "Cannot load user class:
>>> com.test.Test$Foo"
>>>
>>> Looks like Flink is not using the correct ClassLoader. Any idea?
>>>
>>> Regards,
>>> Matt
>>>
>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <
>>> s.rich...@data-artisans.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I would expect that the local environment picks up the class path from
>>>> the code that launched it. So I think the question is what happens behind
>>>> the scenes when you call ignite.compute().broadcast(runnable); . Which
>>>> classes are shipped and how is the classpath build in the environment that
>>>> runs the code. Your example is also not fully conclusive, because
>>>> com.myproj.Test (which you can successfully instantiate) and
>>>> com.myproj.Test$1$2 (which fails) are different classes, so maybe only the
>>>> outer class is shipped with the broadcast call. My theory is that not all
>>>> classes are shipped (e.g. inner classes), but only Test . You could try
>>>> three things to analyze to problem a little more:
>>>>
>>>> 1) Create another inner class inside Test and try if you are still able
>>>> to instantiate also this class via reflection.
>>>> 2) Let Test class itself implement the map function (avoiding the usage
>>>> of other/inner classes) and see if this works.
>>>> 3) Check and set the thread’s context classloader inside the runnable
>>>> to something that contains all required classes and see if this gets picked
>>>> up by Flink.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>:
>>>>
>>>> Hi all,
>>>>
>>>> I'm trying to run Flink using a local environment, but on an Ignite
>>>> node to achieve collocation (as mentioned in my previous message on this
>>>> list).
>>>>
>>>> Have a look at the code in [1]. It's pretty simple, but I'm getting a
>>>> "cannot load user class" error as shown in [2].
>>>>
>>>> If you check line #29 on the code, I'm able to create an instance of
>>>> class Test, and it's the same context from which I'm creating the Flink
>>>> job. Shouldn't it work provided I'm using a local environment?
>>>>
>>>> It would be really nice to be able to inject a ClassLoader into the
>>>> chunk of code that creates the job. Is this currently possible?
>>>>
>>>> Any fix or workaround is appreciated!
>>>>
>>>> Best,
>>>> Matt
>>>>
>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply via email to