There were some newer messages on the issue as well 
(https://issues.apache.org/jira/browse/BEAM-1970 
<https://issues.apache.org/jira/browse/BEAM-1970>). The problem occurs if you 
reuse a Flink cluster for running the same job (or some other job that uses the 
same classes) again. The workaround would be to not reuse a cluster for several 
Jobs. If the jar of a Job has to be in the lib folder this also means that a 
cluster cannot be reused for a new jar since the cluster has to be restarted 
when you have a new job/jar so this workaround would have roughly the same 
“complexity”.

I think we’ll finally get rid of that once we move to a cluster-per-job model. 
:-)

Best,
Aljoscha

> On 29. Apr 2017, at 14:45, Stephan Ewen <[email protected]> wrote:
> 
> Avro tries to do some magic caching which does not work with multi 
> classloader setups.
> 
> A common workaround for these types of problems is to drop the relevant 
> classes into Flink's "lib" folder and not have them in the job's jar.
> 
> We wrote up some help on that: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html>
> 
> Some background: Flink has two different class loading modes:
> 
> (1) All in the system class loader, which is used for the one-job-Yarn 
> deployments (in the future also for Mesos / Docker / etc)
> 
> (2) Dynamic class loading, with one classloader per deployment. Used in all 
> setups that follow the pattern "start Flink first, launch job later".
> 
> 
> On Sat, Apr 29, 2017 at 8:02 AM, Aljoscha Krettek <[email protected] 
> <mailto:[email protected]>> wrote:
> Yep, KafkaCheckpointMark also uses AvroCoder, so I’m guessing it’s exactly 
> the same problem.
> 
> Best,
> Aljoscha
> 
> 
>> On 28. Apr 2017, at 22:29, Jins George <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> I also faced a similar issue when re-starting a flink job from a save point 
>> on an existing cluster . ClassCastException was with   KafkaCheckpointMark 
>> class . It was due to the different class loaders.  The workaround for me 
>> was to run one job per Yarn session.  For restart from savepoint, start a 
>> new yarn session and submit. 
>> 
>> Thanks,
>> Jins George
>> 
>> On 04/28/2017 09:34 AM, Frances Perry wrote:
>>> I have the same problem and am working around it with SerializableCoder. +1 
>>> to a real solution.
>>> 
>>> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek < 
>>> <mailto:[email protected]>[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> I think you could. But we should also try finding a solution for this 
>>> problem.
>>> 
>>>> On 28. Apr 2017, at 17:31, Borisa Zivkovic <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> 
>>>> Hi Aljoscha,
>>>> 
>>>> this is probably the same problem I am facing.
>>>> 
>>>> I execute multiple pipelines on the same Flink cluster - all launched at 
>>>> the same time... 
>>>> 
>>>> I guess I can try to switch to SerializableCoder and see how that works?
>>>> 
>>>> thanks
>>>> 
>>>> 
>>>> 
>>>> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek < 
>>>> <mailto:[email protected]>[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> Hi,
>>>> There is this open issue:  
>>>> <https://issues.apache.org/jira/browse/BEAM-1970>https://issues.apache.o 
>>>> <https://issues.apache.o/>rg/jira/browse/BEAM-1970. Could this also be 
>>>> what is affecting you? Are you running several pipelines on the same Flink 
>>>> cluster, either one after another or at the same time?
>>>> 
>>>> Best,
>>>> Aljoscha
>>>>> On 28. Apr 2017, at 12:45, Borisa Zivkovic < 
>>>>> <mailto:[email protected]>[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I have this small pipeline that is writing data to Kafka (using 
>>>>> AvroCoder) and then another job is reading the same data from Kafka, 
>>>>> doing few transformations and then writing data back to different Kafka 
>>>>> topic (AvroCoder again).
>>>>> 
>>>>> First pipeline is very simple, read data from a text file, create POJO, 
>>>>> use AvroCoder to write POJO to Kafka.
>>>>> 
>>>>> Second pipeline is also simple, read POJO from Kafka, do few 
>>>>> transformations, create new POJO and write data to Kafka using AvroCoder 
>>>>> again.
>>>>> 
>>>>> When I use direct runner everything is ok.
>>>>> 
>>>>> When I switch to flink runner (small remote flink cluster) I get this 
>>>>> exception in the second pipeline
>>>>> 
>>>>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to 
>>>>> test.MyClass
>>>>> 
>>>>> This happens in the the first MapFunction immediately after reading data 
>>>>> from Kafka.
>>>>> 
>>>>> I found about this problem in Flink and how they resolve it but not sure 
>>>>> how to fix this when using Beam?!
>>>>> 
>>>>>  
>>>>> <https://issues.apache.org/jira/browse/FLINK-1390>https://issues.apache.org/jira
>>>>>  <https://issues.apache.org/jira>/browse/FLINK-1390
>>>>> 
>>>>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very 
>>>>> simple POJO.
>>>>> 
>>>>> Not sure how to fix this and still continue using AvroCoder.
>>>>> 
>>>>> My beam version is 0.6.0 - my flink version is 1.2.0
>>>>> 
>>>>> Anyone experienced something similar or has idea how to fix/workaround 
>>>>> this?
>>>>> 
>>>>> thanks
>>>>> 
>>>> 
>>> 
>>> 
>> 
> 
> 

Reply via email to