I also faced a similar issue when re-starting a flink job from a save
point on an existing cluster . ClassCastException was with
KafkaCheckpointMark class . It was due to the different class loaders.
The workaround for me was to run one job per Yarn session. For restart
from savepoint, start a new yarn session and submit.
Thanks,
Jins George
On 04/28/2017 09:34 AM, Frances Perry wrote:
I have the same problem and am working around it with
SerializableCoder. +1 to a real solution.
On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek <[email protected]
<mailto:[email protected]>> wrote:
I think you could. But we should also try finding a solution for
this problem.
On 28. Apr 2017, at 17:31, Borisa Zivkovic
<[email protected] <mailto:[email protected]>>
wrote:
Hi Aljoscha,
this is probably the same problem I am facing.
I execute multiple pipelines on the same Flink cluster - all
launched at the same time...
I guess I can try to switch to SerializableCoder and see how that
works?
thanks
On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek
<[email protected] <mailto:[email protected]>> wrote:
Hi,
There is this open issue:
https://issues.apache.org/jira/browse/BEAM-1970
<https://issues.apache.org/jira/browse/BEAM-1970>. Could this
also be what is affecting you? Are you running several
pipelines on the same Flink cluster, either one after another
or at the same time?
Best,
Aljoscha
On 28. Apr 2017, at 12:45, Borisa Zivkovic
<[email protected]
<mailto:[email protected]>> wrote:
Hi,
I have this small pipeline that is writing data to Kafka
(using AvroCoder) and then another job is reading the same
data from Kafka, doing few transformations and then writing
data back to different Kafka topic (AvroCoder again).
First pipeline is very simple, read data from a text file,
create POJO, use AvroCoder to write POJO to Kafka.
Second pipeline is also simple, read POJO from Kafka, do few
transformations, create new POJO and write data to Kafka
using AvroCoder again.
When I use direct runner everything is ok.
When I switch to flink runner (small remote flink cluster) I
get this exception in the second pipeline
Caused by: java.lang.ClassCastException: test.MyClass cannot
be cast to test.MyClass
This happens in the the first MapFunction immediately after
reading data from Kafka.
I found about this problem in Flink and how they resolve it
but not sure how to fix this when using Beam?!
https://issues.apache.org/jira/browse/FLINK-1390
<https://issues.apache.org/jira/browse/FLINK-1390>
test.MyClass has annotation @DefaultCoder(AvroCoder.class)
and is very simple POJO.
Not sure how to fix this and still continue using AvroCoder.
My beam version is 0.6.0 - my flink version is 1.2.0
Anyone experienced something similar or has idea how to
fix/workaround this?
thanks