Hi Carlos,

We would like for this update to be rejected on submission, as it is...

from: SerializableCoder.of(MessageWithAttributes.class[serialVersionUID=A])
to: SerializableCoder.of(MessageWithAttrbibutes.class[serialVersionUID=B])

... which are incompatible coders as far as the system can tell. If these
were the coders on a PCollection, then it would be rejected. Unfortunately
we don't currently have introspection into the coders of your state cells
in a stateful ParDo.

For you, there are a couple other things you need to do other than wait for
better validation from our end (since that will just block the update
entirely anyhow). Whenever you are expecting to use Java serialization for
something other than just "throwing it across the wire", such as storing it
in persistent state, you need to:
(a) explicitly define the serialized form (I think the methods are
writeReplace and readResolve)
(b) explicitly set the serialVersionUID to indicate this
(c) use a serialized form that is flexible (like protocol buffers) - Java
will *never* let you change serialVersionUID

Given all of that, you might as well write a coder since it integrates well
with Beam and is more efficient and IMO less confusing than Java's
serialization, especially if you are working in Scala. So, essentially,
don't use SerializableCoder if you expect to use pipeline update. To
support upgrade and rollback, I would recommend protocol buffers as a lot
of wisdom is baked into the design.

Kenn

On Wed, Feb 14, 2018 at 3:59 AM, Carlos Alonso <car...@mrcalonso.com> wrote:

> Also I think this situation leads to a blocked status in which elements
> with two different version UIDs are mixed and therefore unable to progress.
>
> When I realised this was happening, I tried to rollback to the previous
> version of the pipeline to drain it and then re-create the job with the new
> version, but during that time, as new elements were streamed in, the old
> version wasn't able to decode de new ones.
>
> Does it make any sense?
>
> Thanks!
>
> On Wed, Feb 14, 2018 at 12:50 PM Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Another thing I've realised is that the stacktrace suggests it is using
>> SerializerCoder instead of the MessageWithAttributesCoder custom one I have
>> implemented. See it here: https://pastebin.com/86gWddi9
>>
>> Is it possible that, upon updates, custom coders are not chosen? To give
>> a little more context. The MessageWithAttribtues object is contained within
>> a KV and a kept in a stateful/timely processing step. The coder is set
>> before that stateful step via .setCoder(KVCoder.of(StringUtf8Coder.of(),
>> MessageWithAttribtuesCoder.of())) and the exception is thrown when the
>> buffered items are flushed out.
>>
>> Thanks!
>>
>> On Wed, Feb 14, 2018 at 11:33 AM Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>>
>>> I've added a couple of methods to a case class and updated the job on
>>> Dataflow and started getting
>>>
>>> java.lang.IllegalStateException: Unable to decode tag list using
>>> org.apache.beam.sdk.coders.SerializableCoder@4ad81832
>>>
>>> Caused by java.io.InvalidClassException: my.package.MessageWithAttributes;
>>> local class incompatible: stream classdesc serialVersionUID =
>>> -5160195612720162441, local class serialVersionUID = -9104690746829156208
>>>
>>> It seems that, although the data contained in the class has not changed,
>>> the class has changed it cannot deserialise it anymore. How should I
>>> proceed to avoid this situations?
>>>
>>> Thanks!
>>>
>>

Reply via email to