And, thanks to Kenn, a likely workaround is in a pending pull request [1].
This should be fixed shortly at HEAD and be a part of the next release.

Davor

[1] https://github.com/apache/beam/pull/2783


On Sat, Apr 29, 2017 at 1:13 PM, Aljoscha Krettek <[email protected]>
wrote:

> There were some newer messages on the issue as well (
> https://issues.apache.org/jira/browse/BEAM-1970). The problem occurs if
> you reuse a Flink cluster for running the same job (or some other job that
> uses the same classes) again. The workaround would be to not reuse a
> cluster for several Jobs. If the jar of a Job has to be in the lib folder
> this also means that a cluster cannot be reused for a new jar since the
> cluster has to be restarted when you have a new job/jar so this workaround
> would have roughly the same “complexity”.
>
> I think we’ll finally get rid of that once we move to a cluster-per-job
> model. :-)
>
> Best,
> Aljoscha
>
> On 29. Apr 2017, at 14:45, Stephan Ewen <[email protected]> wrote:
>
> Avro tries to do some magic caching which does not work with multi
> classloader setups.
>
> A common workaround for these types of problems is to drop the relevant
> classes into Flink's "lib" folder and not have them in the job's jar.
>
> We wrote up some help on that: https://ci.apache.org/pr
> ojects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html
>
> Some background: Flink has two different class loading modes:
>
> (1) All in the system class loader, which is used for the one-job-Yarn
> deployments (in the future also for Mesos / Docker / etc)
>
> (2) Dynamic class loading, with one classloader per deployment. Used in
> all setups that follow the pattern "start Flink first, launch job later".
>
>
> On Sat, Apr 29, 2017 at 8:02 AM, Aljoscha Krettek <[email protected]>
> wrote:
>
>> Yep, KafkaCheckpointMark also uses AvroCoder, so I’m guessing it’s
>> exactly the same problem.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 28. Apr 2017, at 22:29, Jins George <[email protected]> wrote:
>>
>> I also faced a similar issue when re-starting a flink job from a save
>> point on an existing cluster . ClassCastException was with
>> KafkaCheckpointMark class . It was due to the different class loaders.  The
>> workaround for me was to run one job per Yarn session.  For restart from
>> savepoint, start a new yarn session and submit.
>>
>> Thanks,
>> Jins George
>>
>> On 04/28/2017 09:34 AM, Frances Perry wrote:
>>
>> I have the same problem and am working around it with SerializableCoder.
>> +1 to a real solution.
>>
>> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek < <[email protected]>
>> [email protected]> wrote:
>>
>>> I think you could. But we should also try finding a solution for this
>>> problem.
>>>
>>> On 28. Apr 2017, at 17:31, Borisa Zivkovic <[email protected]>
>>> wrote:
>>>
>>> Hi Aljoscha,
>>>
>>> this is probably the same problem I am facing.
>>>
>>> I execute multiple pipelines on the same Flink cluster - all launched at
>>> the same time...
>>>
>>> I guess I can try to switch to SerializableCoder and see how that works?
>>>
>>> thanks
>>>
>>>
>>>
>>> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek < <[email protected]>
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>> There is this open issue:
>>>> <https://issues.apache.org/jira/browse/BEAM-1970>
>>>> https://issues.apache.org/jira/browse/BEAM-1970. Could this also be
>>>> what is affecting you? Are you running several pipelines on the same Flink
>>>> cluster, either one after another or at the same time?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 28. Apr 2017, at 12:45, Borisa Zivkovic <
>>>> <[email protected]>[email protected]> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I have this small pipeline that is writing data to Kafka (using
>>>> AvroCoder) and then another job is reading the same data from Kafka, doing
>>>> few transformations and then writing data back to different Kafka topic
>>>> (AvroCoder again).
>>>>
>>>> First pipeline is very simple, read data from a text file, create POJO,
>>>> use AvroCoder to write POJO to Kafka.
>>>>
>>>> Second pipeline is also simple, read POJO from Kafka, do few
>>>> transformations, create new POJO and write data to Kafka using AvroCoder
>>>> again.
>>>>
>>>> When I use direct runner everything is ok.
>>>>
>>>> When I switch to flink runner (small remote flink cluster) I get this
>>>> exception in the second pipeline
>>>>
>>>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to
>>>> test.MyClass
>>>>
>>>> This happens in the the first MapFunction immediately after reading
>>>> data from Kafka.
>>>>
>>>> I found about this problem in Flink and how they resolve it but not
>>>> sure how to fix this when using Beam?!
>>>>
>>>> <https://issues.apache.org/jira/browse/FLINK-1390>
>>>> https://issues.apache.org/jira/browse/FLINK-1390
>>>>
>>>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very
>>>> simple POJO.
>>>>
>>>> Not sure how to fix this and still continue using AvroCoder.
>>>>
>>>> My beam version is 0.6.0 - my flink version is 1.2.0
>>>>
>>>> Anyone experienced something similar or has idea how to fix/workaround
>>>> this?
>>>>
>>>> thanks
>>>>
>>>>
>>>>
>>>
>>
>>
>>
>
>

Reply via email to