Hi,

I have this small pipeline that is writing data to Kafka (using AvroCoder)
and then another job is reading the same data from Kafka, doing few
transformations and then writing data back to different Kafka topic
(AvroCoder again).

First pipeline is very simple, read data from a text file, create POJO, use
AvroCoder to write POJO to Kafka.

Second pipeline is also simple, read POJO from Kafka, do few
transformations, create new POJO and write data to Kafka using AvroCoder
again.

When I use direct runner everything is ok.

When I switch to flink runner (small remote flink cluster) I get this
exception in the second pipeline

Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to
test.MyClass

This happens in the the first MapFunction immediately after reading data
from Kafka.

I found about this problem in Flink and how they resolve it but not sure
how to fix this when using Beam?!

https://issues.apache.org/jira/browse/FLINK-1390

test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very
simple POJO.

Not sure how to fix this and still continue using AvroCoder.

My beam version is 0.6.0 - my flink version is 1.2.0

Anyone experienced something similar or has idea how to fix/workaround this?

thanks

Reply via email to