There were some newer messages on the issue as well (https://issues.apache.org/jira/browse/BEAM-1970 <https://issues.apache.org/jira/browse/BEAM-1970>). The problem occurs if you reuse a Flink cluster for running the same job (or some other job that uses the same classes) again. The workaround would be to not reuse a cluster for several Jobs. If the jar of a Job has to be in the lib folder this also means that a cluster cannot be reused for a new jar since the cluster has to be restarted when you have a new job/jar so this workaround would have roughly the same “complexity”.
I think we’ll finally get rid of that once we move to a cluster-per-job model. :-) Best, Aljoscha > On 29. Apr 2017, at 14:45, Stephan Ewen <[email protected]> wrote: > > Avro tries to do some magic caching which does not work with multi > classloader setups. > > A common workaround for these types of problems is to drop the relevant > classes into Flink's "lib" folder and not have them in the job's jar. > > We wrote up some help on that: > https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html> > > Some background: Flink has two different class loading modes: > > (1) All in the system class loader, which is used for the one-job-Yarn > deployments (in the future also for Mesos / Docker / etc) > > (2) Dynamic class loading, with one classloader per deployment. Used in all > setups that follow the pattern "start Flink first, launch job later". > > > On Sat, Apr 29, 2017 at 8:02 AM, Aljoscha Krettek <[email protected] > <mailto:[email protected]>> wrote: > Yep, KafkaCheckpointMark also uses AvroCoder, so I’m guessing it’s exactly > the same problem. > > Best, > Aljoscha > > >> On 28. Apr 2017, at 22:29, Jins George <[email protected] >> <mailto:[email protected]>> wrote: >> >> I also faced a similar issue when re-starting a flink job from a save point >> on an existing cluster . ClassCastException was with KafkaCheckpointMark >> class . It was due to the different class loaders. The workaround for me >> was to run one job per Yarn session. For restart from savepoint, start a >> new yarn session and submit. >> >> Thanks, >> Jins George >> >> On 04/28/2017 09:34 AM, Frances Perry wrote: >>> I have the same problem and am working around it with SerializableCoder. +1 >>> to a real solution. >>> >>> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek < >>> <mailto:[email protected]>[email protected] >>> <mailto:[email protected]>> wrote: >>> I think you could. But we should also try finding a solution for this >>> problem. >>> >>>> On 28. Apr 2017, at 17:31, Borisa Zivkovic <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> >>>> Hi Aljoscha, >>>> >>>> this is probably the same problem I am facing. >>>> >>>> I execute multiple pipelines on the same Flink cluster - all launched at >>>> the same time... >>>> >>>> I guess I can try to switch to SerializableCoder and see how that works? >>>> >>>> thanks >>>> >>>> >>>> >>>> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek < >>>> <mailto:[email protected]>[email protected] >>>> <mailto:[email protected]>> wrote: >>>> Hi, >>>> There is this open issue: >>>> <https://issues.apache.org/jira/browse/BEAM-1970>https://issues.apache.o >>>> <https://issues.apache.o/>rg/jira/browse/BEAM-1970. Could this also be >>>> what is affecting you? Are you running several pipelines on the same Flink >>>> cluster, either one after another or at the same time? >>>> >>>> Best, >>>> Aljoscha >>>>> On 28. Apr 2017, at 12:45, Borisa Zivkovic < >>>>> <mailto:[email protected]>[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I have this small pipeline that is writing data to Kafka (using >>>>> AvroCoder) and then another job is reading the same data from Kafka, >>>>> doing few transformations and then writing data back to different Kafka >>>>> topic (AvroCoder again). >>>>> >>>>> First pipeline is very simple, read data from a text file, create POJO, >>>>> use AvroCoder to write POJO to Kafka. >>>>> >>>>> Second pipeline is also simple, read POJO from Kafka, do few >>>>> transformations, create new POJO and write data to Kafka using AvroCoder >>>>> again. >>>>> >>>>> When I use direct runner everything is ok. >>>>> >>>>> When I switch to flink runner (small remote flink cluster) I get this >>>>> exception in the second pipeline >>>>> >>>>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to >>>>> test.MyClass >>>>> >>>>> This happens in the the first MapFunction immediately after reading data >>>>> from Kafka. >>>>> >>>>> I found about this problem in Flink and how they resolve it but not sure >>>>> how to fix this when using Beam?! >>>>> >>>>> >>>>> <https://issues.apache.org/jira/browse/FLINK-1390>https://issues.apache.org/jira >>>>> <https://issues.apache.org/jira>/browse/FLINK-1390 >>>>> >>>>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very >>>>> simple POJO. >>>>> >>>>> Not sure how to fix this and still continue using AvroCoder. >>>>> >>>>> My beam version is 0.6.0 - my flink version is 1.2.0 >>>>> >>>>> Anyone experienced something similar or has idea how to fix/workaround >>>>> this? >>>>> >>>>> thanks >>>>> >>>> >>> >>> >> > >
