That DelegateCoder seems like a good idea to keep the DoFn code clean. In the short term I will perhaps use json string as my intermediary and will let Jackson do the serialization and deserialization. from what I have learned so far is that Dataflow does some magic in the background to re-order fields to make the PCollection schema compatible to the previous job (if I use SchemaCoder), So I don't have to make a switch from Java beans yet. I can use the DelegateCoder for my statespec and continue using SchemaCoder for PCollections.
@Reuven Regarding timers in stateful DoFn, do they get carried over to the new job in Dataflow? On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin <[email protected]> wrote: > Hello Gaurav, > > It looks like the issues related to state evolution is something quite > recurrent in this mailing list, and do not seem to be covered in the Beam > docs, unfortunately. > > I am not familiar with Google Dataflow runner, but the way I achieved > state evolution on Flink runner was a combo of: > 1 - version-controlled Avro schemas for generating POJOs (models) which > were used in stateful Beam DoFns > 2 - a generic Avro-generated POJO as an intermediary - ( schema: string, > payload: bytes ) > 3 - > https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html > responsible for converting a model into an intermediary (2) > > This allowed adding & dropping nullable fields and to do other > Avro-forward compatible changes without loosing the state. The order of > fields also didn't matter in this setting. And this also meant double ser- > and deserialization, unfortunately. But I wasn't able to come to any better > solution, given that there seemed to be no public info available about the > topic, or, maybe, due to the lack of my experience. > > Relevant mail threads: > https://www.mail-archive.com/[email protected]/msg07249.html > > Best Regards, > Pavel Solomin > > Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin > <https://www.linkedin.com/in/pavelsolomin> > > > > > > On Mon, 10 Jan 2022 at 08:43, gaurav mishra <[email protected]> > wrote: > >> >> >> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> wrote: >> >>> Unfortunately, as you've discovered, Dataflow's update only pays >>> attention to actual PCollections, and does not look at DoFn state variables >>> at all (historically, stateful DoFns did not yet exist when Dataflow >>> implemented its update support). >>> >>> You can file a feature request against Google to implement update on >>> state variables, as this is honestly a major feature gap. In the meantime >>> there might be some workarounds you can use until this support is added. >>> e.g. >>> - you could convert the data into a protocol buffer before storing >>> into state; ProtoCoder should handle new fields, and fields in a protocol >>> buffer have a deterministic order. >>> >> Is there an easy way to go from a java object to a protocol buffer >> object. The only way I can think of is to first serialize the java model to >> json and then deserialize the json into a proto buffer object. Same for >> the other way around - protobuf to java object. while I was writing this I >> realized I can maybe store the json dump of my java object in state spec. >> That should also work. >> >>> - you could explicitly use the Row object, although this will again >>> make your code more complex. >>> >> >>> Another intermediate option - we could fairly easily add >>> a @SchemaFieldId annotation to Beam that would allow you to specify the >>> order of fields. You would have to annotate every getter, something like >>> this: >>> >>> @ScheamFieldId(0) >>> public T getV1(); >>> >>> @SchemaFieldId(1) >>> public T getV2(); >>> >> Yes please. If this is an easy change and If you can point me in the >> right direction I will be happy to submit a patch for this. >> >> >> >>> >>> etc. >>> >>> >>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> wrote: >>> >>>> SerializableCoder can be tricky for updates, and can also be very >>>> inefficient. >>>> >>>> As you noticed, when inferring schemas from a Java class, the order of >>>> fields is non deterministic (not much that we can do about this, as Java >>>> doesn't return the fields in a deterministic order). For regular >>>> PCollections, Dataflow (though currently only Dataflow) will allow update >>>> even though the field orders have changed. Unfortunately >>>> >>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra < >>>> [email protected]> wrote: >>>> >>>>> I now have SchemaCoder for all data flowing through the pipeline. >>>>> I am noticing another issue once a new pipeline is started with >>>>> --update flag. In my stateful DoFn where I am doing prevState.read(), >>>>> exceptions are being thrown. errors like >>>>> Caused by: java.io.EOFException: reached end of stream after reading >>>>> 68 bytes; 101 bytes expected >>>>> In this case there was no change in code or data model when >>>>> relaunching the pipeline. >>>>> >>>>> It seems that Schema for my Bean is changing on every run of the >>>>> pipeline. I manually inspected the Schema returned by the SchemaRegistry >>>>> by >>>>> running the pipeline a few times locally and each time I noticed the order >>>>> of fields in the Schema was different. This changing of order of fields is >>>>> breaking the new pipeline. Is there a way to enforce some kind of ordering >>>>> on the fields? >>>>> >>>>> >>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra < >>>>> [email protected]> wrote: >>>>> >>>>>> Looking at source code for the JavaBeamSchema it seems I will have to >>>>>> annotate my getter and setters with any flavour of @Nullable. >>>>>> Another question - >>>>>> does the dataflow update pipeline feature work only with SchemaCoder? >>>>>> I have SerializableCoder and ProtoCoder being used in some of my other >>>>>> pipelines. I wonder if I have to change those too to open doors for >>>>>> updating pipelines there? >>>>>> >>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Some progress made. >>>>>>> Now I can see that schema has all the needed fields. But all fields >>>>>>> in the returned schema are marked as "NOT NULL". even though the fields >>>>>>> are >>>>>>> annotated with @javax.annotation.Nullable. >>>>>>> Is there a different flavor of @Nullable I need to use on my fields? >>>>>>> >>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote: >>>>>>> >>>>>>>> Ah, this isn't a POJO then. In this case, try using >>>>>>>> DefaultSchema(JavaBeanSchema.class) >>>>>>>> >>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> It looks something like this. It currently has around 40 fields. >>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true) >>>>>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>>>>> @EqualsAndHashCode >>>>>>>>> @ToString >>>>>>>>> public class POJO implements SomeInterface, Serializable { >>>>>>>>> >>>>>>>>> public static final Integer SOME_CONSTANT_FIELD = 2; >>>>>>>>> >>>>>>>>> @JsonProperty("field_1") >>>>>>>>> private String field1; >>>>>>>>> @JsonProperty("field_2") >>>>>>>>> private String field2; >>>>>>>>> >>>>>>>>> @Nullable >>>>>>>>> @JsonProperty("field_3") >>>>>>>>> private Integer field3; >>>>>>>>> >>>>>>>>> ..... >>>>>>>>> >>>>>>>>> @JsonProperty("field_1") >>>>>>>>> public String getField1() { >>>>>>>>> return field1; >>>>>>>>> } >>>>>>>>> >>>>>>>>> @JsonProperty("field_1") >>>>>>>>> public void setField1(String field1) { >>>>>>>>> this.field1 = field1; >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> @JsonProperty("field_2") >>>>>>>>> public String getField2() { >>>>>>>>> return field2; >>>>>>>>> } >>>>>>>>> >>>>>>>>> @JsonProperty("field_2") >>>>>>>>> public void setField2(String field2) { >>>>>>>>> this.field2 = field2; >>>>>>>>> } >>>>>>>>> >>>>>>>>> @JsonProperty("field_3") >>>>>>>>> public Integer getField3() { >>>>>>>>> return field3; >>>>>>>>> } >>>>>>>>> >>>>>>>>> @JsonProperty("field_3") >>>>>>>>> public void setField3(Integer field3) { >>>>>>>>> this.field3 = field3; >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> public POJO() { >>>>>>>>> >>>>>>>>> 0 arg empty constructor >>>>>>>>> } >>>>>>>>> public POJO(POJO other) { >>>>>>>>> >>>>>>>>> ... copy constructor >>>>>>>>> } >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Can you paste the code for your Pojo? >>>>>>>>>> >>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a >>>>>>>>>>> empty schema. >>>>>>>>>>> my Pojo is annotated with >>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class) >>>>>>>>>>> Is there something extra I need to do here to register my class >>>>>>>>>>> with a schema registry. >>>>>>>>>>> Note: the code which is building the pipeline is sitting in a >>>>>>>>>>> library (Different package) which is being imported into my >>>>>>>>>>> pipeline code. >>>>>>>>>>> So perhaps there is some configuration which is missing which >>>>>>>>>>> allows the >>>>>>>>>>> framework to discover my Pojo and the annotations associated with >>>>>>>>>>> it. >>>>>>>>>>> >>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I think I can make it work now. I found a utility method for >>>>>>>>>>>>>> building my coder from class >>>>>>>>>>>>>> Something like >>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass(); >>>>>>>>>>>>>> Coder<Data> dataCoder = >>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass), >>>>>>>>>>>>>> TypeDescriptor.of(dataClass), >>>>>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass), >>>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass)); >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> This will work. Though, did annotating the POJO like I said >>>>>>>>>>>>> not work? >>>>>>>>>>>>> >>>>>>>>>>>> No, annotation alone does not work since I am not using >>>>>>>>>>>> concrete classes in the code where the pipeline is being >>>>>>>>>>>> constructed. >>>>>>>>>>>> <Data> above is a template variable in the class which is >>>>>>>>>>>> constructing the >>>>>>>>>>>> pipeline. >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> removing setCoder call breaks my pipeline. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> No Coder has been manually specified; you may do so using >>>>>>>>>>>>>>> .setCoder(). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Inferring a Coder from the CoderRegistry failed: Unable >>>>>>>>>>>>>>> to provide a Coder for Data. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Building a Coder using a registered CoderProvider failed. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Reason being the code which is building the pipeline is >>>>>>>>>>>>>>> based on Java Generics. Actual pipeline building code sets a >>>>>>>>>>>>>>> bunch of >>>>>>>>>>>>>>> parameters which are used to construct the pipeline. >>>>>>>>>>>>>>> PCollection<Data> stream = >>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder) >>>>>>>>>>>>>>> So I guess I will need to provide some more information to >>>>>>>>>>>>>>> the framework to make the annotation work. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If you annotate your POJO >>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually >>>>>>>>>>>>>>>> automatically >>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder >>>>>>>>>>>>>>>> call). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding. >>>>>>>>>>>>>>>>> In my current code I am doing something like this >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> PCollection<Data> = >>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class)) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder. >>>>>>>>>>>>>>>>>> Can you use Beam's schema encoding instead? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for >>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests if >>>>>>>>>>>>>>>>>>> incompatible changes >>>>>>>>>>>>>>>>>>> are made to Pojo. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO >>>>>>>>>>>>>>>>>>>> before and after the change to ensure that they are >>>>>>>>>>>>>>>>>>>> compatible as you >>>>>>>>>>>>>>>>>>>> expect. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking >>>>>>>>>>>>>>>>>>>>> here in hopes someone has faced a similar problem and can >>>>>>>>>>>>>>>>>>>>> help. >>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a >>>>>>>>>>>>>>>>>>>>> running Dataflow job. I am noticing that compatibility >>>>>>>>>>>>>>>>>>>>> checks fail any time >>>>>>>>>>>>>>>>>>>>> I add a new field to my data model. Error says >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ has changed >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data. Avro coder to serialize >>>>>>>>>>>>>>>>>>>>> the model. I read somewhere that adding new optional >>>>>>>>>>>>>>>>>>>>> fields to the data should work when updating the pipeline. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of >>>>>>>>>>>>>>>>>>>>> the model to something which allows me to update the >>>>>>>>>>>>>>>>>>>>> pipeline in cases when I add new optional fields to >>>>>>>>>>>>>>>>>>>>> existing model. Any suggestions? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>
