Hello Gaurav, It looks like the issues related to state evolution is something quite recurrent in this mailing list, and do not seem to be covered in the Beam docs, unfortunately.
I am not familiar with Google Dataflow runner, but the way I achieved state evolution on Flink runner was a combo of: 1 - version-controlled Avro schemas for generating POJOs (models) which were used in stateful Beam DoFns 2 - a generic Avro-generated POJO as an intermediary - ( schema: string, payload: bytes ) 3 - https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html responsible for converting a model into an intermediary (2) This allowed adding & dropping nullable fields and to do other Avro-forward compatible changes without loosing the state. The order of fields also didn't matter in this setting. And this also meant double ser- and deserialization, unfortunately. But I wasn't able to come to any better solution, given that there seemed to be no public info available about the topic, or, maybe, due to the lack of my experience. Relevant mail threads: https://www.mail-archive.com/[email protected]/msg07249.html Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Mon, 10 Jan 2022 at 08:43, gaurav mishra <[email protected]> wrote: > > > On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> wrote: > >> Unfortunately, as you've discovered, Dataflow's update only pays >> attention to actual PCollections, and does not look at DoFn state variables >> at all (historically, stateful DoFns did not yet exist when Dataflow >> implemented its update support). >> >> You can file a feature request against Google to implement update on >> state variables, as this is honestly a major feature gap. In the meantime >> there might be some workarounds you can use until this support is added. >> e.g. >> - you could convert the data into a protocol buffer before storing >> into state; ProtoCoder should handle new fields, and fields in a protocol >> buffer have a deterministic order. >> > Is there an easy way to go from a java object to a protocol buffer > object. The only way I can think of is to first serialize the java model to > json and then deserialize the json into a proto buffer object. Same for > the other way around - protobuf to java object. while I was writing this I > realized I can maybe store the json dump of my java object in state spec. > That should also work. > >> - you could explicitly use the Row object, although this will again >> make your code more complex. >> > >> Another intermediate option - we could fairly easily add a @SchemaFieldId >> annotation to Beam that would allow you to specify the order of fields. You >> would have to annotate every getter, something like this: >> >> @ScheamFieldId(0) >> public T getV1(); >> >> @SchemaFieldId(1) >> public T getV2(); >> > Yes please. If this is an easy change and If you can point me in the right > direction I will be happy to submit a patch for this. > > > >> >> etc. >> >> >> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> wrote: >> >>> SerializableCoder can be tricky for updates, and can also be very >>> inefficient. >>> >>> As you noticed, when inferring schemas from a Java class, the order of >>> fields is non deterministic (not much that we can do about this, as Java >>> doesn't return the fields in a deterministic order). For regular >>> PCollections, Dataflow (though currently only Dataflow) will allow update >>> even though the field orders have changed. Unfortunately >>> >>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra < >>> [email protected]> wrote: >>> >>>> I now have SchemaCoder for all data flowing through the pipeline. >>>> I am noticing another issue once a new pipeline is started with >>>> --update flag. In my stateful DoFn where I am doing prevState.read(), >>>> exceptions are being thrown. errors like >>>> Caused by: java.io.EOFException: reached end of stream after reading 68 >>>> bytes; 101 bytes expected >>>> In this case there was no change in code or data model when relaunching >>>> the pipeline. >>>> >>>> It seems that Schema for my Bean is changing on every run of the >>>> pipeline. I manually inspected the Schema returned by the SchemaRegistry by >>>> running the pipeline a few times locally and each time I noticed the order >>>> of fields in the Schema was different. This changing of order of fields is >>>> breaking the new pipeline. Is there a way to enforce some kind of ordering >>>> on the fields? >>>> >>>> >>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra < >>>> [email protected]> wrote: >>>> >>>>> Looking at source code for the JavaBeamSchema it seems I will have to >>>>> annotate my getter and setters with any flavour of @Nullable. >>>>> Another question - >>>>> does the dataflow update pipeline feature work only with SchemaCoder? >>>>> I have SerializableCoder and ProtoCoder being used in some of my other >>>>> pipelines. I wonder if I have to change those too to open doors for >>>>> updating pipelines there? >>>>> >>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra < >>>>> [email protected]> wrote: >>>>> >>>>>> Some progress made. >>>>>> Now I can see that schema has all the needed fields. But all fields >>>>>> in the returned schema are marked as "NOT NULL". even though the fields >>>>>> are >>>>>> annotated with @javax.annotation.Nullable. >>>>>> Is there a different flavor of @Nullable I need to use on my fields? >>>>>> >>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> Ah, this isn't a POJO then. In this case, try using >>>>>>> DefaultSchema(JavaBeanSchema.class) >>>>>>> >>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> It looks something like this. It currently has around 40 fields. >>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true) >>>>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>>>> @EqualsAndHashCode >>>>>>>> @ToString >>>>>>>> public class POJO implements SomeInterface, Serializable { >>>>>>>> >>>>>>>> public static final Integer SOME_CONSTANT_FIELD = 2; >>>>>>>> >>>>>>>> @JsonProperty("field_1") >>>>>>>> private String field1; >>>>>>>> @JsonProperty("field_2") >>>>>>>> private String field2; >>>>>>>> >>>>>>>> @Nullable >>>>>>>> @JsonProperty("field_3") >>>>>>>> private Integer field3; >>>>>>>> >>>>>>>> ..... >>>>>>>> >>>>>>>> @JsonProperty("field_1") >>>>>>>> public String getField1() { >>>>>>>> return field1; >>>>>>>> } >>>>>>>> >>>>>>>> @JsonProperty("field_1") >>>>>>>> public void setField1(String field1) { >>>>>>>> this.field1 = field1; >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> @JsonProperty("field_2") >>>>>>>> public String getField2() { >>>>>>>> return field2; >>>>>>>> } >>>>>>>> >>>>>>>> @JsonProperty("field_2") >>>>>>>> public void setField2(String field2) { >>>>>>>> this.field2 = field2; >>>>>>>> } >>>>>>>> >>>>>>>> @JsonProperty("field_3") >>>>>>>> public Integer getField3() { >>>>>>>> return field3; >>>>>>>> } >>>>>>>> >>>>>>>> @JsonProperty("field_3") >>>>>>>> public void setField3(Integer field3) { >>>>>>>> this.field3 = field3; >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> public POJO() { >>>>>>>> >>>>>>>> 0 arg empty constructor >>>>>>>> } >>>>>>>> public POJO(POJO other) { >>>>>>>> >>>>>>>> ... copy constructor >>>>>>>> } >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> wrote: >>>>>>>> >>>>>>>>> Can you paste the code for your Pojo? >>>>>>>>> >>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a >>>>>>>>>> empty schema. >>>>>>>>>> my Pojo is annotated with >>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class) >>>>>>>>>> Is there something extra I need to do here to register my class >>>>>>>>>> with a schema registry. >>>>>>>>>> Note: the code which is building the pipeline is sitting in a >>>>>>>>>> library (Different package) which is being imported into my pipeline >>>>>>>>>> code. >>>>>>>>>> So perhaps there is some configuration which is missing which allows >>>>>>>>>> the >>>>>>>>>> framework to discover my Pojo and the annotations associated with it. >>>>>>>>>> >>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I think I can make it work now. I found a utility method for >>>>>>>>>>>>> building my coder from class >>>>>>>>>>>>> Something like >>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass(); >>>>>>>>>>>>> Coder<Data> dataCoder = >>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass), >>>>>>>>>>>>> TypeDescriptor.of(dataClass), >>>>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass), >>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass)); >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> This will work. Though, did annotating the POJO like I said not >>>>>>>>>>>> work? >>>>>>>>>>>> >>>>>>>>>>> No, annotation alone does not work since I am not using >>>>>>>>>>> concrete classes in the code where the pipeline is being >>>>>>>>>>> constructed. >>>>>>>>>>> <Data> above is a template variable in the class which is >>>>>>>>>>> constructing the >>>>>>>>>>> pipeline. >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> removing setCoder call breaks my pipeline. >>>>>>>>>>>>>> >>>>>>>>>>>>>> No Coder has been manually specified; you may do so using >>>>>>>>>>>>>> .setCoder(). >>>>>>>>>>>>>> >>>>>>>>>>>>>> Inferring a Coder from the CoderRegistry failed: Unable to >>>>>>>>>>>>>> provide a Coder for Data. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Building a Coder using a registered CoderProvider failed. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Reason being the code which is building the pipeline is based >>>>>>>>>>>>>> on Java Generics. Actual pipeline building code sets a bunch of >>>>>>>>>>>>>> parameters >>>>>>>>>>>>>> which are used to construct the pipeline. >>>>>>>>>>>>>> PCollection<Data> stream = >>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder) >>>>>>>>>>>>>> So I guess I will need to provide some more information to >>>>>>>>>>>>>> the framework to make the annotation work. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> If you annotate your POJO >>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually >>>>>>>>>>>>>>> automatically >>>>>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder >>>>>>>>>>>>>>> call). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding. >>>>>>>>>>>>>>>> In my current code I am doing something like this >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> PCollection<Data> = >>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class)) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder. Can >>>>>>>>>>>>>>>>> you use Beam's schema encoding instead? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Is there a way to programmatically check for >>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests if >>>>>>>>>>>>>>>>>> incompatible changes >>>>>>>>>>>>>>>>>> are made to Pojo. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO >>>>>>>>>>>>>>>>>>> before and after the change to ensure that they are >>>>>>>>>>>>>>>>>>> compatible as you >>>>>>>>>>>>>>>>>>> expect. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking >>>>>>>>>>>>>>>>>>>> here in hopes someone has faced a similar problem and can >>>>>>>>>>>>>>>>>>>> help. >>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a >>>>>>>>>>>>>>>>>>>> running Dataflow job. I am noticing that compatibility >>>>>>>>>>>>>>>>>>>> checks fail any time >>>>>>>>>>>>>>>>>>>> I add a new field to my data model. Error says >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ has changed >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data. Avro coder to serialize >>>>>>>>>>>>>>>>>>>> the model. I read somewhere that adding new optional >>>>>>>>>>>>>>>>>>>> fields to the data should work when updating the pipeline. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of the >>>>>>>>>>>>>>>>>>>> model to something which allows me to update the pipeline >>>>>>>>>>>>>>>>>>>> in cases when I add new optional fields to existing model. >>>>>>>>>>>>>>>>>>>> Any suggestions? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>
