Yep, KafkaCheckpointMark also uses AvroCoder, so I’m guessing it’s exactly the 
same problem.

Best,
Aljoscha

> On 28. Apr 2017, at 22:29, Jins George <[email protected]> wrote:
> 
> I also faced a similar issue when re-starting a flink job from a save point 
> on an existing cluster . ClassCastException was with   KafkaCheckpointMark 
> class . It was due to the different class loaders.  The workaround for me was 
> to run one job per Yarn session.  For restart from savepoint, start a new 
> yarn session and submit. 
> 
> Thanks,
> Jins George
> 
> On 04/28/2017 09:34 AM, Frances Perry wrote:
>> I have the same problem and am working around it with SerializableCoder. +1 
>> to a real solution.
>> 
>> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek < 
>> <mailto:[email protected]>[email protected] 
>> <mailto:[email protected]>> wrote:
>> I think you could. But we should also try finding a solution for this 
>> problem.
>> 
>>> On 28. Apr 2017, at 17:31, Borisa Zivkovic <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> this is probably the same problem I am facing.
>>> 
>>> I execute multiple pipelines on the same Flink cluster - all launched at 
>>> the same time... 
>>> 
>>> I guess I can try to switch to SerializableCoder and see how that works?
>>> 
>>> thanks
>>> 
>>> 
>>> 
>>> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek < 
>>> <mailto:[email protected]>[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> Hi,
>>> There is this open issue:  
>>> <https://issues.apache.org/jira/browse/BEAM-1970>https://issues.apache.o 
>>> <https://issues.apache.o/>rg/jira/browse/BEAM-1970. Could this also be what 
>>> is affecting you? Are you running several pipelines on the same Flink 
>>> cluster, either one after another or at the same time?
>>> 
>>> Best,
>>> Aljoscha
>>>> On 28. Apr 2017, at 12:45, Borisa Zivkovic < 
>>>> <mailto:[email protected]>[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I have this small pipeline that is writing data to Kafka (using AvroCoder) 
>>>> and then another job is reading the same data from Kafka, doing few 
>>>> transformations and then writing data back to different Kafka topic 
>>>> (AvroCoder again).
>>>> 
>>>> First pipeline is very simple, read data from a text file, create POJO, 
>>>> use AvroCoder to write POJO to Kafka.
>>>> 
>>>> Second pipeline is also simple, read POJO from Kafka, do few 
>>>> transformations, create new POJO and write data to Kafka using AvroCoder 
>>>> again.
>>>> 
>>>> When I use direct runner everything is ok.
>>>> 
>>>> When I switch to flink runner (small remote flink cluster) I get this 
>>>> exception in the second pipeline
>>>> 
>>>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to 
>>>> test.MyClass
>>>> 
>>>> This happens in the the first MapFunction immediately after reading data 
>>>> from Kafka.
>>>> 
>>>> I found about this problem in Flink and how they resolve it but not sure 
>>>> how to fix this when using Beam?!
>>>> 
>>>>  
>>>> <https://issues.apache.org/jira/browse/FLINK-1390>https://issues.apache.org/jira
>>>>  <https://issues.apache.org/jira>/browse/FLINK-1390
>>>> 
>>>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very 
>>>> simple POJO.
>>>> 
>>>> Not sure how to fix this and still continue using AvroCoder.
>>>> 
>>>> My beam version is 0.6.0 - my flink version is 1.2.0
>>>> 
>>>> Anyone experienced something similar or has idea how to fix/workaround 
>>>> this?
>>>> 
>>>> thanks
>>>> 
>>> 
>> 
>> 
> 

Reply via email to