Yep, KafkaCheckpointMark also uses AvroCoder, so I’m guessing it’s exactly the same problem.
Best, Aljoscha > On 28. Apr 2017, at 22:29, Jins George <[email protected]> wrote: > > I also faced a similar issue when re-starting a flink job from a save point > on an existing cluster . ClassCastException was with KafkaCheckpointMark > class . It was due to the different class loaders. The workaround for me was > to run one job per Yarn session. For restart from savepoint, start a new > yarn session and submit. > > Thanks, > Jins George > > On 04/28/2017 09:34 AM, Frances Perry wrote: >> I have the same problem and am working around it with SerializableCoder. +1 >> to a real solution. >> >> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek < >> <mailto:[email protected]>[email protected] >> <mailto:[email protected]>> wrote: >> I think you could. But we should also try finding a solution for this >> problem. >> >>> On 28. Apr 2017, at 17:31, Borisa Zivkovic <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Hi Aljoscha, >>> >>> this is probably the same problem I am facing. >>> >>> I execute multiple pipelines on the same Flink cluster - all launched at >>> the same time... >>> >>> I guess I can try to switch to SerializableCoder and see how that works? >>> >>> thanks >>> >>> >>> >>> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek < >>> <mailto:[email protected]>[email protected] >>> <mailto:[email protected]>> wrote: >>> Hi, >>> There is this open issue: >>> <https://issues.apache.org/jira/browse/BEAM-1970>https://issues.apache.o >>> <https://issues.apache.o/>rg/jira/browse/BEAM-1970. Could this also be what >>> is affecting you? Are you running several pipelines on the same Flink >>> cluster, either one after another or at the same time? >>> >>> Best, >>> Aljoscha >>>> On 28. Apr 2017, at 12:45, Borisa Zivkovic < >>>> <mailto:[email protected]>[email protected] >>>> <mailto:[email protected]>> wrote: >>>> >>>> Hi, >>>> >>>> I have this small pipeline that is writing data to Kafka (using AvroCoder) >>>> and then another job is reading the same data from Kafka, doing few >>>> transformations and then writing data back to different Kafka topic >>>> (AvroCoder again). >>>> >>>> First pipeline is very simple, read data from a text file, create POJO, >>>> use AvroCoder to write POJO to Kafka. >>>> >>>> Second pipeline is also simple, read POJO from Kafka, do few >>>> transformations, create new POJO and write data to Kafka using AvroCoder >>>> again. >>>> >>>> When I use direct runner everything is ok. >>>> >>>> When I switch to flink runner (small remote flink cluster) I get this >>>> exception in the second pipeline >>>> >>>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to >>>> test.MyClass >>>> >>>> This happens in the the first MapFunction immediately after reading data >>>> from Kafka. >>>> >>>> I found about this problem in Flink and how they resolve it but not sure >>>> how to fix this when using Beam?! >>>> >>>> >>>> <https://issues.apache.org/jira/browse/FLINK-1390>https://issues.apache.org/jira >>>> <https://issues.apache.org/jira>/browse/FLINK-1390 >>>> >>>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very >>>> simple POJO. >>>> >>>> Not sure how to fix this and still continue using AvroCoder. >>>> >>>> My beam version is 0.6.0 - my flink version is 1.2.0 >>>> >>>> Anyone experienced something similar or has idea how to fix/workaround >>>> this? >>>> >>>> thanks >>>> >>> >> >> >
