So does that mean my only option is to move away from using Java class and use PCollection<Row> everywhere If I want to be able to update my pipeline.
On Mon, Jan 10, 2022 at 12:06 AM Reuven Lax <[email protected]> wrote: > SerializableCoder can be tricky for updates, and can also be very > inefficient. > > As you noticed, when inferring schemas from a Java class, the order of > fields is non deterministic (not much that we can do about this, as Java > doesn't return the fields in a deterministic order). For regular > PCollections, Dataflow (though currently only Dataflow) will allow update > even though the field orders have changed. Unfortunately > > On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra < > [email protected]> wrote: > >> I now have SchemaCoder for all data flowing through the pipeline. >> I am noticing another issue once a new pipeline is started with --update >> flag. In my stateful DoFn where I am doing prevState.read(), exceptions are >> being thrown. errors like >> Caused by: java.io.EOFException: reached end of stream after reading 68 >> bytes; 101 bytes expected >> In this case there was no change in code or data model when relaunching >> the pipeline. >> >> It seems that Schema for my Bean is changing on every run of the >> pipeline. I manually inspected the Schema returned by the SchemaRegistry by >> running the pipeline a few times locally and each time I noticed the order >> of fields in the Schema was different. This changing of order of fields is >> breaking the new pipeline. Is there a way to enforce some kind of ordering >> on the fields? >> >> >> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra < >> [email protected]> wrote: >> >>> Looking at source code for the JavaBeamSchema it seems I will have to >>> annotate my getter and setters with any flavour of @Nullable. >>> Another question - >>> does the dataflow update pipeline feature work only with SchemaCoder? I >>> have SerializableCoder and ProtoCoder being used in some of my other >>> pipelines. I wonder if I have to change those too to open doors for >>> updating pipelines there? >>> >>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra < >>> [email protected]> wrote: >>> >>>> Some progress made. >>>> Now I can see that schema has all the needed fields. But all fields in >>>> the returned schema are marked as "NOT NULL". even though the fields are >>>> annotated with @javax.annotation.Nullable. >>>> Is there a different flavor of @Nullable I need to use on my fields? >>>> >>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> Ah, this isn't a POJO then. In this case, try using >>>>> DefaultSchema(JavaBeanSchema.class) >>>>> >>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra < >>>>> [email protected]> wrote: >>>>> >>>>>> It looks something like this. It currently has around 40 fields. >>>>>> @JsonIgnoreProperties(ignoreUnknown = true) >>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>> @EqualsAndHashCode >>>>>> @ToString >>>>>> public class POJO implements SomeInterface, Serializable { >>>>>> >>>>>> public static final Integer SOME_CONSTANT_FIELD = 2; >>>>>> >>>>>> @JsonProperty("field_1") >>>>>> private String field1; >>>>>> @JsonProperty("field_2") >>>>>> private String field2; >>>>>> >>>>>> @Nullable >>>>>> @JsonProperty("field_3") >>>>>> private Integer field3; >>>>>> >>>>>> ..... >>>>>> >>>>>> @JsonProperty("field_1") >>>>>> public String getField1() { >>>>>> return field1; >>>>>> } >>>>>> >>>>>> @JsonProperty("field_1") >>>>>> public void setField1(String field1) { >>>>>> this.field1 = field1; >>>>>> } >>>>>> >>>>>> >>>>>> @JsonProperty("field_2") >>>>>> public String getField2() { >>>>>> return field2; >>>>>> } >>>>>> >>>>>> @JsonProperty("field_2") >>>>>> public void setField2(String field2) { >>>>>> this.field2 = field2; >>>>>> } >>>>>> >>>>>> @JsonProperty("field_3") >>>>>> public Integer getField3() { >>>>>> return field3; >>>>>> } >>>>>> >>>>>> @JsonProperty("field_3") >>>>>> public void setField3(Integer field3) { >>>>>> this.field3 = field3; >>>>>> } >>>>>> >>>>>> >>>>>> public POJO() { >>>>>> >>>>>> 0 arg empty constructor >>>>>> } >>>>>> public POJO(POJO other) { >>>>>> >>>>>> ... copy constructor >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> Can you paste the code for your Pojo? >>>>>>> >>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a >>>>>>>> empty schema. >>>>>>>> my Pojo is annotated with the @DefaultSchema(JavaFieldSchema.class) >>>>>>>> Is there something extra I need to do here to register my class >>>>>>>> with a schema registry. >>>>>>>> Note: the code which is building the pipeline is sitting in a >>>>>>>> library (Different package) which is being imported into my pipeline >>>>>>>> code. >>>>>>>> So perhaps there is some configuration which is missing which allows >>>>>>>> the >>>>>>>> framework to discover my Pojo and the annotations associated with it. >>>>>>>> >>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> I think I can make it work now. I found a utility method for >>>>>>>>>>> building my coder from class >>>>>>>>>>> Something like >>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass(); >>>>>>>>>>> Coder<Data> dataCoder = >>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass), >>>>>>>>>>> TypeDescriptor.of(dataClass), >>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass), >>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass)); >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This will work. Though, did annotating the POJO like I said not >>>>>>>>>> work? >>>>>>>>>> >>>>>>>>> No, annotation alone does not work since I am not using concrete >>>>>>>>> classes in the code where the pipeline is being constructed. <Data> >>>>>>>>> above >>>>>>>>> is a template variable in the class which is constructing the >>>>>>>>> pipeline. >>>>>>>>> >>>>>>>>>> >>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> removing setCoder call breaks my pipeline. >>>>>>>>>>>> >>>>>>>>>>>> No Coder has been manually specified; you may do so using >>>>>>>>>>>> .setCoder(). >>>>>>>>>>>> >>>>>>>>>>>> Inferring a Coder from the CoderRegistry failed: Unable to >>>>>>>>>>>> provide a Coder for Data. >>>>>>>>>>>> >>>>>>>>>>>> Building a Coder using a registered CoderProvider failed. >>>>>>>>>>>> >>>>>>>>>>>> Reason being the code which is building the pipeline is based >>>>>>>>>>>> on Java Generics. Actual pipeline building code sets a bunch of >>>>>>>>>>>> parameters >>>>>>>>>>>> which are used to construct the pipeline. >>>>>>>>>>>> PCollection<Data> stream = >>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder) >>>>>>>>>>>> So I guess I will need to provide some more information to the >>>>>>>>>>>> framework to make the annotation work. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> If you annotate your POJO >>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually >>>>>>>>>>>>> automatically >>>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder call). >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding. >>>>>>>>>>>>>> In my current code I am doing something like this >>>>>>>>>>>>>> >>>>>>>>>>>>>> PCollection<Data> = >>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class)) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder. Can >>>>>>>>>>>>>>> you use Beam's schema encoding instead? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Is there a way to programmatically check for compatibility? >>>>>>>>>>>>>>>> I would like to fail my unit tests if incompatible changes are >>>>>>>>>>>>>>>> made to >>>>>>>>>>>>>>>> Pojo. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO before >>>>>>>>>>>>>>>>> and after the change to ensure that they are compatible as >>>>>>>>>>>>>>>>> you expect. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking >>>>>>>>>>>>>>>>>> here in hopes someone has faced a similar problem and can >>>>>>>>>>>>>>>>>> help. >>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a running >>>>>>>>>>>>>>>>>> Dataflow job. I am noticing that compatibility checks fail >>>>>>>>>>>>>>>>>> any time I add a >>>>>>>>>>>>>>>>>> new field to my data model. Error says >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The Coder or type for step XYZ has changed >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I am using a Java Pojo for data. Avro coder to serialize >>>>>>>>>>>>>>>>>> the model. I read somewhere that adding new optional fields >>>>>>>>>>>>>>>>>> to the data should work when updating the pipeline. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of the >>>>>>>>>>>>>>>>>> model to something which allows me to update the pipeline in >>>>>>>>>>>>>>>>>> cases when I add new optional fields to existing model. Any >>>>>>>>>>>>>>>>>> suggestions? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>
