That DelegateCoder seems like a good idea to keep the DoFn code clean. In
the short term I will perhaps use json string as my intermediary and will
let Jackson do the serialization and deserialization. from what I have
learned so far is that Dataflow does some magic in the background to
re-order fields to make the PCollection schema compatible to the previous
job (if I use SchemaCoder), So I don't have to make a switch from Java
beans yet. I can use the DelegateCoder for my statespec and continue
using SchemaCoder for PCollections.

@Reuven
Regarding timers in stateful DoFn, do they get carried over to the new job
in Dataflow?




On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin <[email protected]> wrote:

> Hello Gaurav,
>
> It looks like the issues related to state evolution is something quite
> recurrent in this mailing list, and do not seem to be covered in the Beam
> docs, unfortunately.
>
> I am not familiar with Google Dataflow runner, but the way I achieved
> state evolution on Flink runner was a combo of:
> 1 - version-controlled Avro schemas for generating POJOs (models) which
> were used in stateful Beam DoFns
> 2 - a generic Avro-generated POJO as an intermediary - ( schema: string,
> payload: bytes )
> 3 -
> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html
> responsible for converting a model into an intermediary (2)
>
> This allowed adding & dropping nullable fields and to do other
> Avro-forward compatible changes without loosing the state. The order of
> fields also didn't matter in this setting. And this also meant double ser-
> and deserialization, unfortunately. But I wasn't able to come to any better
> solution, given that there seemed to be no public info available about the
> topic, or, maybe, due to the lack of my experience.
>
> Relevant mail threads:
> https://www.mail-archive.com/[email protected]/msg07249.html
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Mon, 10 Jan 2022 at 08:43, gaurav mishra <[email protected]>
> wrote:
>
>>
>>
>> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> wrote:
>>
>>> Unfortunately, as you've discovered, Dataflow's update only pays
>>> attention to actual PCollections, and does not look at DoFn state variables
>>> at all (historically, stateful DoFns did not yet exist when Dataflow
>>> implemented its update support).
>>>
>>> You can file a feature request against Google to implement update on
>>> state variables, as this is honestly a major feature gap. In the meantime
>>> there might be some workarounds you can use until this support is added.
>>> e.g.
>>>    - you could convert the data into a protocol buffer before storing
>>> into state; ProtoCoder should handle new fields, and fields in a protocol
>>> buffer have a deterministic order.
>>>
>> Is there an easy way to go from a java object to a protocol buffer
>> object. The only way I can think of is to first serialize the java model to
>> json and then deserialize the json into a proto buffer object.  Same for
>> the other way around - protobuf to java object.  while I was writing this I
>> realized I can maybe store the json dump of my java object in state spec.
>> That should also work.
>>
>>>    - you could explicitly use the Row object, although this will again
>>> make your code more complex.
>>>
>>
>>> Another intermediate option - we could fairly easily add
>>> a @SchemaFieldId annotation to Beam that would allow you to specify the
>>> order of fields. You would have to annotate every getter, something like
>>> this:
>>>
>>> @ScheamFieldId(0)
>>> public T getV1();
>>>
>>> @SchemaFieldId(1)
>>> public T getV2();
>>>
>> Yes please. If this is an easy change and If you can point me in the
>> right direction I will be happy to submit a patch for this.
>>
>>
>>
>>>
>>> etc.
>>>
>>>
>>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> wrote:
>>>
>>>> SerializableCoder can be tricky for updates, and can also be very
>>>> inefficient.
>>>>
>>>> As you noticed, when inferring schemas from a Java class, the order of
>>>> fields is non deterministic (not much that we can do about this, as Java
>>>> doesn't return the fields in a deterministic order). For regular
>>>> PCollections, Dataflow (though currently only Dataflow) will allow update
>>>> even though the field orders have changed. Unfortunately
>>>>
>>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra <
>>>> [email protected]> wrote:
>>>>
>>>>> I now have SchemaCoder for all data flowing through the pipeline.
>>>>> I am noticing another issue once a new pipeline is started with
>>>>> --update flag. In my stateful DoFn where I am doing prevState.read(),
>>>>> exceptions are being thrown. errors like
>>>>> Caused by: java.io.EOFException: reached end of stream after reading
>>>>> 68 bytes; 101 bytes expected
>>>>> In this case there was no change in code or data model when
>>>>> relaunching the pipeline.
>>>>>
>>>>> It seems that Schema for my Bean is changing on every run of the
>>>>> pipeline. I manually inspected the Schema returned by the SchemaRegistry 
>>>>> by
>>>>> running the pipeline a few times locally and each time I noticed the order
>>>>> of fields in the Schema was different. This changing of order of fields is
>>>>> breaking the new pipeline. Is there a way to enforce some kind of ordering
>>>>> on the fields?
>>>>>
>>>>>
>>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Looking at source code for the JavaBeamSchema it seems I will have to
>>>>>> annotate my getter and setters with any flavour of @Nullable.
>>>>>> Another question -
>>>>>> does the dataflow update pipeline feature work only with SchemaCoder?
>>>>>> I have SerializableCoder and ProtoCoder being used in some of my other
>>>>>> pipelines. I wonder if I have to change those too to open doors for
>>>>>> updating pipelines there?
>>>>>>
>>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Some progress made.
>>>>>>> Now I can see that schema has all the needed fields. But all fields
>>>>>>> in the returned schema are marked as "NOT NULL". even though the fields 
>>>>>>> are
>>>>>>> annotated with @javax.annotation.Nullable.
>>>>>>> Is there a different flavor of @Nullable I need to use on my fields?
>>>>>>>
>>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote:
>>>>>>>
>>>>>>>> Ah, this isn't a POJO then. In this case, try using
>>>>>>>> DefaultSchema(JavaBeanSchema.class)
>>>>>>>>
>>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> It looks something like this. It currently has around 40 fields.
>>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>> @EqualsAndHashCode
>>>>>>>>> @ToString
>>>>>>>>> public class POJO implements SomeInterface, Serializable {
>>>>>>>>>
>>>>>>>>>     public static final Integer SOME_CONSTANT_FIELD = 2;
>>>>>>>>>
>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>     private String field1;
>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>     private String field2;
>>>>>>>>>
>>>>>>>>>     @Nullable
>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>     private Integer field3;
>>>>>>>>>
>>>>>>>>>     .....
>>>>>>>>>
>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>     public String getField1() {
>>>>>>>>>         return field1;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>     public void setField1(String field1) {
>>>>>>>>>         this.field1 = field1;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>     public String getField2() {
>>>>>>>>>         return field2;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>     public void setField2(String field2) {
>>>>>>>>>         this.field2 = field2;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>     public Integer getField3() {
>>>>>>>>>         return field3;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>     public void setField3(Integer field3) {
>>>>>>>>>         this.field3 = field3;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>     public POJO() {
>>>>>>>>>
>>>>>>>>>          0 arg empty constructor
>>>>>>>>>     }
>>>>>>>>>     public POJO(POJO other) {
>>>>>>>>>
>>>>>>>>>        ... copy constructor
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Can you paste the code for your Pojo?
>>>>>>>>>>
>>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a
>>>>>>>>>>> empty schema.
>>>>>>>>>>> my Pojo is annotated with
>>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>> Is there something extra I need to do here to register my class
>>>>>>>>>>> with a schema registry.
>>>>>>>>>>> Note: the code which is building the pipeline is sitting in a
>>>>>>>>>>> library (Different package) which is being imported into my 
>>>>>>>>>>> pipeline code.
>>>>>>>>>>> So perhaps there is some configuration which is missing which 
>>>>>>>>>>> allows the
>>>>>>>>>>> framework to discover my Pojo and the annotations associated with 
>>>>>>>>>>> it.
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think I can make it work now. I found a utility method for
>>>>>>>>>>>>>> building my coder from class
>>>>>>>>>>>>>> Something like
>>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>>>>>>>>>>> Coder<Data> dataCoder =
>>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>>>>>>>>>>                 TypeDescriptor.of(dataClass),
>>>>>>>>>>>>>>                 schemaRegistry.getToRowFunction(dataClass),
>>>>>>>>>>>>>>                 schemaRegistry.getFromRowFunction(dataClass));
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> This will work. Though, did annotating the POJO like I said
>>>>>>>>>>>>> not work?
>>>>>>>>>>>>>
>>>>>>>>>>>>  No, annotation alone does not work since I am not using
>>>>>>>>>>>> concrete classes in the code where the pipeline is being 
>>>>>>>>>>>> constructed.
>>>>>>>>>>>> <Data> above is a template variable in the class which is 
>>>>>>>>>>>> constructing the
>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> removing setCoder call breaks my pipeline.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> No Coder has been manually specified;  you may do so using
>>>>>>>>>>>>>>> .setCoder().
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>   Inferring a Coder from the CoderRegistry failed: Unable
>>>>>>>>>>>>>>> to provide a Coder for Data.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Reason being the code which is building the pipeline is
>>>>>>>>>>>>>>> based on Java Generics. Actual pipeline building code sets a 
>>>>>>>>>>>>>>> bunch of
>>>>>>>>>>>>>>> parameters which are used to construct the pipeline.
>>>>>>>>>>>>>>> PCollection<Data> stream =
>>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>>>>>>>>>>> So I guess I will need to provide some more information to
>>>>>>>>>>>>>>> the framework to make the annotation work.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If you annotate your POJO
>>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually 
>>>>>>>>>>>>>>>> automatically
>>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder 
>>>>>>>>>>>>>>>> call).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding.
>>>>>>>>>>>>>>>>> In my current code I am doing something like this
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> PCollection<Data> =
>>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder.
>>>>>>>>>>>>>>>>>> Can you use Beam's schema encoding instead?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for
>>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests if 
>>>>>>>>>>>>>>>>>>> incompatible changes
>>>>>>>>>>>>>>>>>>> are made to Pojo.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO
>>>>>>>>>>>>>>>>>>>> before and after the change to ensure that they are 
>>>>>>>>>>>>>>>>>>>> compatible as you
>>>>>>>>>>>>>>>>>>>> expect.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking
>>>>>>>>>>>>>>>>>>>>> here in hopes someone has faced a similar problem and can 
>>>>>>>>>>>>>>>>>>>>> help.
>>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a
>>>>>>>>>>>>>>>>>>>>> running Dataflow job. I am noticing that compatibility 
>>>>>>>>>>>>>>>>>>>>> checks fail any time
>>>>>>>>>>>>>>>>>>>>> I add a new field to my data model. Error says
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to serialize 
>>>>>>>>>>>>>>>>>>>>> the model. I read somewhere that adding new optional 
>>>>>>>>>>>>>>>>>>>>> fields to the data should work when updating the pipeline.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of 
>>>>>>>>>>>>>>>>>>>>> the model to something which allows me to update the 
>>>>>>>>>>>>>>>>>>>>> pipeline in cases when I add new optional fields to 
>>>>>>>>>>>>>>>>>>>>> existing model. Any suggestions?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>

Reply via email to