And, thanks to Kenn, a likely workaround is in a pending pull request [1]. This should be fixed shortly at HEAD and be a part of the next release.
Davor [1] https://github.com/apache/beam/pull/2783 On Sat, Apr 29, 2017 at 1:13 PM, Aljoscha Krettek <[email protected]> wrote: > There were some newer messages on the issue as well ( > https://issues.apache.org/jira/browse/BEAM-1970). The problem occurs if > you reuse a Flink cluster for running the same job (or some other job that > uses the same classes) again. The workaround would be to not reuse a > cluster for several Jobs. If the jar of a Job has to be in the lib folder > this also means that a cluster cannot be reused for a new jar since the > cluster has to be restarted when you have a new job/jar so this workaround > would have roughly the same “complexity”. > > I think we’ll finally get rid of that once we move to a cluster-per-job > model. :-) > > Best, > Aljoscha > > On 29. Apr 2017, at 14:45, Stephan Ewen <[email protected]> wrote: > > Avro tries to do some magic caching which does not work with multi > classloader setups. > > A common workaround for these types of problems is to drop the relevant > classes into Flink's "lib" folder and not have them in the job's jar. > > We wrote up some help on that: https://ci.apache.org/pr > ojects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html > > Some background: Flink has two different class loading modes: > > (1) All in the system class loader, which is used for the one-job-Yarn > deployments (in the future also for Mesos / Docker / etc) > > (2) Dynamic class loading, with one classloader per deployment. Used in > all setups that follow the pattern "start Flink first, launch job later". > > > On Sat, Apr 29, 2017 at 8:02 AM, Aljoscha Krettek <[email protected]> > wrote: > >> Yep, KafkaCheckpointMark also uses AvroCoder, so I’m guessing it’s >> exactly the same problem. >> >> Best, >> Aljoscha >> >> >> On 28. Apr 2017, at 22:29, Jins George <[email protected]> wrote: >> >> I also faced a similar issue when re-starting a flink job from a save >> point on an existing cluster . ClassCastException was with >> KafkaCheckpointMark class . It was due to the different class loaders. The >> workaround for me was to run one job per Yarn session. For restart from >> savepoint, start a new yarn session and submit. >> >> Thanks, >> Jins George >> >> On 04/28/2017 09:34 AM, Frances Perry wrote: >> >> I have the same problem and am working around it with SerializableCoder. >> +1 to a real solution. >> >> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek < <[email protected]> >> [email protected]> wrote: >> >>> I think you could. But we should also try finding a solution for this >>> problem. >>> >>> On 28. Apr 2017, at 17:31, Borisa Zivkovic <[email protected]> >>> wrote: >>> >>> Hi Aljoscha, >>> >>> this is probably the same problem I am facing. >>> >>> I execute multiple pipelines on the same Flink cluster - all launched at >>> the same time... >>> >>> I guess I can try to switch to SerializableCoder and see how that works? >>> >>> thanks >>> >>> >>> >>> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek < <[email protected]> >>> [email protected]> wrote: >>> >>>> Hi, >>>> There is this open issue: >>>> <https://issues.apache.org/jira/browse/BEAM-1970> >>>> https://issues.apache.org/jira/browse/BEAM-1970. Could this also be >>>> what is affecting you? Are you running several pipelines on the same Flink >>>> cluster, either one after another or at the same time? >>>> >>>> Best, >>>> Aljoscha >>>> >>>> On 28. Apr 2017, at 12:45, Borisa Zivkovic < >>>> <[email protected]>[email protected]> wrote: >>>> >>>> Hi, >>>> >>>> I have this small pipeline that is writing data to Kafka (using >>>> AvroCoder) and then another job is reading the same data from Kafka, doing >>>> few transformations and then writing data back to different Kafka topic >>>> (AvroCoder again). >>>> >>>> First pipeline is very simple, read data from a text file, create POJO, >>>> use AvroCoder to write POJO to Kafka. >>>> >>>> Second pipeline is also simple, read POJO from Kafka, do few >>>> transformations, create new POJO and write data to Kafka using AvroCoder >>>> again. >>>> >>>> When I use direct runner everything is ok. >>>> >>>> When I switch to flink runner (small remote flink cluster) I get this >>>> exception in the second pipeline >>>> >>>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to >>>> test.MyClass >>>> >>>> This happens in the the first MapFunction immediately after reading >>>> data from Kafka. >>>> >>>> I found about this problem in Flink and how they resolve it but not >>>> sure how to fix this when using Beam?! >>>> >>>> <https://issues.apache.org/jira/browse/FLINK-1390> >>>> https://issues.apache.org/jira/browse/FLINK-1390 >>>> >>>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very >>>> simple POJO. >>>> >>>> Not sure how to fix this and still continue using AvroCoder. >>>> >>>> My beam version is 0.6.0 - my flink version is 1.2.0 >>>> >>>> Anyone experienced something similar or has idea how to fix/workaround >>>> this? >>>> >>>> thanks >>>> >>>> >>>> >>> >> >> >> > >
