On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> wrote:
> Unfortunately, as you've discovered, Dataflow's update only pays attention > to actual PCollections, and does not look at DoFn state variables at all > (historically, stateful DoFns did not yet exist when Dataflow implemented > its update support). > > You can file a feature request against Google to implement update on state > variables, as this is honestly a major feature gap. In the meantime there > might be some workarounds you can use until this support is added. e.g. > - you could convert the data into a protocol buffer before storing into > state; ProtoCoder should handle new fields, and fields in a protocol buffer > have a deterministic order. > Is there an easy way to go from a java object to a protocol buffer object. The only way I can think of is to first serialize the java model to json and then deserialize the json into a proto buffer object. Same for the other way around - protobuf to java object. while I was writing this I realized I can maybe store the json dump of my java object in state spec. That should also work. > - you could explicitly use the Row object, although this will again > make your code more complex. > > Another intermediate option - we could fairly easily add a @SchemaFieldId > annotation to Beam that would allow you to specify the order of fields. You > would have to annotate every getter, something like this: > > @ScheamFieldId(0) > public T getV1(); > > @SchemaFieldId(1) > public T getV2(); > Yes please. If this is an easy change and If you can point me in the right direction I will be happy to submit a patch for this. > > etc. > > > On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> wrote: > >> SerializableCoder can be tricky for updates, and can also be very >> inefficient. >> >> As you noticed, when inferring schemas from a Java class, the order of >> fields is non deterministic (not much that we can do about this, as Java >> doesn't return the fields in a deterministic order). For regular >> PCollections, Dataflow (though currently only Dataflow) will allow update >> even though the field orders have changed. Unfortunately >> >> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra < >> [email protected]> wrote: >> >>> I now have SchemaCoder for all data flowing through the pipeline. >>> I am noticing another issue once a new pipeline is started with --update >>> flag. In my stateful DoFn where I am doing prevState.read(), exceptions are >>> being thrown. errors like >>> Caused by: java.io.EOFException: reached end of stream after reading 68 >>> bytes; 101 bytes expected >>> In this case there was no change in code or data model when relaunching >>> the pipeline. >>> >>> It seems that Schema for my Bean is changing on every run of the >>> pipeline. I manually inspected the Schema returned by the SchemaRegistry by >>> running the pipeline a few times locally and each time I noticed the order >>> of fields in the Schema was different. This changing of order of fields is >>> breaking the new pipeline. Is there a way to enforce some kind of ordering >>> on the fields? >>> >>> >>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra < >>> [email protected]> wrote: >>> >>>> Looking at source code for the JavaBeamSchema it seems I will have to >>>> annotate my getter and setters with any flavour of @Nullable. >>>> Another question - >>>> does the dataflow update pipeline feature work only with SchemaCoder? I >>>> have SerializableCoder and ProtoCoder being used in some of my other >>>> pipelines. I wonder if I have to change those too to open doors for >>>> updating pipelines there? >>>> >>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra < >>>> [email protected]> wrote: >>>> >>>>> Some progress made. >>>>> Now I can see that schema has all the needed fields. But all fields in >>>>> the returned schema are marked as "NOT NULL". even though the fields are >>>>> annotated with @javax.annotation.Nullable. >>>>> Is there a different flavor of @Nullable I need to use on my fields? >>>>> >>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> Ah, this isn't a POJO then. In this case, try using >>>>>> DefaultSchema(JavaBeanSchema.class) >>>>>> >>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> It looks something like this. It currently has around 40 fields. >>>>>>> @JsonIgnoreProperties(ignoreUnknown = true) >>>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>>> @EqualsAndHashCode >>>>>>> @ToString >>>>>>> public class POJO implements SomeInterface, Serializable { >>>>>>> >>>>>>> public static final Integer SOME_CONSTANT_FIELD = 2; >>>>>>> >>>>>>> @JsonProperty("field_1") >>>>>>> private String field1; >>>>>>> @JsonProperty("field_2") >>>>>>> private String field2; >>>>>>> >>>>>>> @Nullable >>>>>>> @JsonProperty("field_3") >>>>>>> private Integer field3; >>>>>>> >>>>>>> ..... >>>>>>> >>>>>>> @JsonProperty("field_1") >>>>>>> public String getField1() { >>>>>>> return field1; >>>>>>> } >>>>>>> >>>>>>> @JsonProperty("field_1") >>>>>>> public void setField1(String field1) { >>>>>>> this.field1 = field1; >>>>>>> } >>>>>>> >>>>>>> >>>>>>> @JsonProperty("field_2") >>>>>>> public String getField2() { >>>>>>> return field2; >>>>>>> } >>>>>>> >>>>>>> @JsonProperty("field_2") >>>>>>> public void setField2(String field2) { >>>>>>> this.field2 = field2; >>>>>>> } >>>>>>> >>>>>>> @JsonProperty("field_3") >>>>>>> public Integer getField3() { >>>>>>> return field3; >>>>>>> } >>>>>>> >>>>>>> @JsonProperty("field_3") >>>>>>> public void setField3(Integer field3) { >>>>>>> this.field3 = field3; >>>>>>> } >>>>>>> >>>>>>> >>>>>>> public POJO() { >>>>>>> >>>>>>> 0 arg empty constructor >>>>>>> } >>>>>>> public POJO(POJO other) { >>>>>>> >>>>>>> ... copy constructor >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> >>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> wrote: >>>>>>> >>>>>>>> Can you paste the code for your Pojo? >>>>>>>> >>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a >>>>>>>>> empty schema. >>>>>>>>> my Pojo is annotated with the @DefaultSchema(JavaFieldSchema.class) >>>>>>>>> Is there something extra I need to do here to register my class >>>>>>>>> with a schema registry. >>>>>>>>> Note: the code which is building the pipeline is sitting in a >>>>>>>>> library (Different package) which is being imported into my pipeline >>>>>>>>> code. >>>>>>>>> So perhaps there is some configuration which is missing which allows >>>>>>>>> the >>>>>>>>> framework to discover my Pojo and the annotations associated with it. >>>>>>>>> >>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> I think I can make it work now. I found a utility method for >>>>>>>>>>>> building my coder from class >>>>>>>>>>>> Something like >>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass(); >>>>>>>>>>>> Coder<Data> dataCoder = >>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass), >>>>>>>>>>>> TypeDescriptor.of(dataClass), >>>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass), >>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass)); >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This will work. Though, did annotating the POJO like I said not >>>>>>>>>>> work? >>>>>>>>>>> >>>>>>>>>> No, annotation alone does not work since I am not using concrete >>>>>>>>>> classes in the code where the pipeline is being constructed. <Data> >>>>>>>>>> above >>>>>>>>>> is a template variable in the class which is constructing the >>>>>>>>>> pipeline. >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> removing setCoder call breaks my pipeline. >>>>>>>>>>>>> >>>>>>>>>>>>> No Coder has been manually specified; you may do so using >>>>>>>>>>>>> .setCoder(). >>>>>>>>>>>>> >>>>>>>>>>>>> Inferring a Coder from the CoderRegistry failed: Unable to >>>>>>>>>>>>> provide a Coder for Data. >>>>>>>>>>>>> >>>>>>>>>>>>> Building a Coder using a registered CoderProvider failed. >>>>>>>>>>>>> >>>>>>>>>>>>> Reason being the code which is building the pipeline is based >>>>>>>>>>>>> on Java Generics. Actual pipeline building code sets a bunch of >>>>>>>>>>>>> parameters >>>>>>>>>>>>> which are used to construct the pipeline. >>>>>>>>>>>>> PCollection<Data> stream = >>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder) >>>>>>>>>>>>> So I guess I will need to provide some more information to the >>>>>>>>>>>>> framework to make the annotation work. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> If you annotate your POJO >>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually >>>>>>>>>>>>>> automatically >>>>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder >>>>>>>>>>>>>> call). >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding. >>>>>>>>>>>>>>> In my current code I am doing something like this >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> PCollection<Data> = >>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class)) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder. Can >>>>>>>>>>>>>>>> you use Beam's schema encoding instead? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Is there a way to programmatically check for >>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests if >>>>>>>>>>>>>>>>> incompatible changes >>>>>>>>>>>>>>>>> are made to Pojo. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO before >>>>>>>>>>>>>>>>>> and after the change to ensure that they are compatible as >>>>>>>>>>>>>>>>>> you expect. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking >>>>>>>>>>>>>>>>>>> here in hopes someone has faced a similar problem and can >>>>>>>>>>>>>>>>>>> help. >>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a running >>>>>>>>>>>>>>>>>>> Dataflow job. I am noticing that compatibility checks fail >>>>>>>>>>>>>>>>>>> any time I add a >>>>>>>>>>>>>>>>>>> new field to my data model. Error says >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ has changed >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data. Avro coder to serialize >>>>>>>>>>>>>>>>>>> the model. I read somewhere that adding new optional fields >>>>>>>>>>>>>>>>>>> to the data should work when updating the pipeline. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of the >>>>>>>>>>>>>>>>>>> model to something which allows me to update the pipeline >>>>>>>>>>>>>>>>>>> in cases when I add new optional fields to existing model. >>>>>>>>>>>>>>>>>>> Any suggestions? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>
