(For reference, I'm in 1.0.3)

I have a job that looks like this:


DataStream<A> input = ...

input

    .map(MapFunction<A,B>...)

    .addSink(...);


input

    .map(MapFunction<A,C>...)

    ?.addSink(...);


If I do not call enableObjectReuse() it works, if I do call enableObjectReuse() 
it throws:

java.lang.ClassCastException: B cannot be cast to A

when attempting to process the second MapFunction<A,C>


It looks like in the input operator is calling Output<StreamRecord<A>>.collect, 
which will pass the value to the first MapFunction and then the second 
MapFunction.  However, the first map function calls StreamRecord<>.replace() 
which mutates the stored value to the output of the function.  When 
Output<StreamRecord<A>>.collect passes this to the next MapFunction it is now 
as if the two MapFunctions are in serial and not parallel.


I looked into JIRA and didn't see an issue that looked exactly like this.  Is 
this known?


-Bart




________________________________
This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or 
PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient 
and, therefore, may not be retransmitted to any party outside of the 
recipient's organization without the prior written consent of the sender. If 
you have received this e-mail in error please notify the sender immediately by 
telephone or reply e-mail and destroy the original message without making a 
copy. Deep Silver, Inc. accepts no liability for any losses or damages 
resulting from infected e-mail transmissions and viruses in e-mail attachments.

Reply via email to