I think you could. But we should also try finding a solution for this problem.

> On 28. Apr 2017, at 17:31, Borisa Zivkovic <[email protected]> wrote:
> 
> Hi Aljoscha,
> 
> this is probably the same problem I am facing.
> 
> I execute multiple pipelines on the same Flink cluster - all launched at the 
> same time... 
> 
> I guess I can try to switch to SerializableCoder and see how that works?
> 
> thanks
> 
> 
> 
> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi,
> There is this open issue: https://issues.apache.org/jira/browse/BEAM-1970 
> <https://issues.apache.org/jira/browse/BEAM-1970>. Could this also be what is 
> affecting you? Are you running several pipelines on the same Flink cluster, 
> either one after another or at the same time?
> 
> Best,
> Aljoscha
>> On 28. Apr 2017, at 12:45, Borisa Zivkovic <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Hi,
>> 
>> I have this small pipeline that is writing data to Kafka (using AvroCoder) 
>> and then another job is reading the same data from Kafka, doing few 
>> transformations and then writing data back to different Kafka topic 
>> (AvroCoder again).
>> 
>> First pipeline is very simple, read data from a text file, create POJO, use 
>> AvroCoder to write POJO to Kafka.
>> 
>> Second pipeline is also simple, read POJO from Kafka, do few 
>> transformations, create new POJO and write data to Kafka using AvroCoder 
>> again.
>> 
>> When I use direct runner everything is ok.
>> 
>> When I switch to flink runner (small remote flink cluster) I get this 
>> exception in the second pipeline
>> 
>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to 
>> test.MyClass
>> 
>> This happens in the the first MapFunction immediately after reading data 
>> from Kafka.
>> 
>> I found about this problem in Flink and how they resolve it but not sure how 
>> to fix this when using Beam?!
>> 
>> https://issues.apache.org/jira/browse/FLINK-1390 
>> <https://issues.apache.org/jira/browse/FLINK-1390>
>> 
>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very 
>> simple POJO.
>> 
>> Not sure how to fix this and still continue using AvroCoder.
>> 
>> My beam version is 0.6.0 - my flink version is 1.2.0
>> 
>> Anyone experienced something similar or has idea how to fix/workaround this?
>> 
>> thanks
>> 
> 

Reply via email to