Looking at source code for the JavaBeamSchema it seems I will have to annotate my getter and setters with any flavour of @Nullable. Another question - does the dataflow update pipeline feature work only with SchemaCoder? I have SerializableCoder and ProtoCoder being used in some of my other pipelines. I wonder if I have to change those too to open doors for updating pipelines there?
On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <[email protected]> wrote: > Some progress made. > Now I can see that schema has all the needed fields. But all fields in the > returned schema are marked as "NOT NULL". even though the fields are > annotated with @javax.annotation.Nullable. > Is there a different flavor of @Nullable I need to use on my fields? > > On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote: > >> Ah, this isn't a POJO then. In this case, try using >> DefaultSchema(JavaBeanSchema.class) >> >> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra < >> [email protected]> wrote: >> >>> It looks something like this. It currently has around 40 fields. >>> @JsonIgnoreProperties(ignoreUnknown = true) >>> @DefaultSchema(JavaFieldSchema.class) >>> @EqualsAndHashCode >>> @ToString >>> public class POJO implements SomeInterface, Serializable { >>> >>> public static final Integer SOME_CONSTANT_FIELD = 2; >>> >>> @JsonProperty("field_1") >>> private String field1; >>> @JsonProperty("field_2") >>> private String field2; >>> >>> @Nullable >>> @JsonProperty("field_3") >>> private Integer field3; >>> >>> ..... >>> >>> @JsonProperty("field_1") >>> public String getField1() { >>> return field1; >>> } >>> >>> @JsonProperty("field_1") >>> public void setField1(String field1) { >>> this.field1 = field1; >>> } >>> >>> >>> @JsonProperty("field_2") >>> public String getField2() { >>> return field2; >>> } >>> >>> @JsonProperty("field_2") >>> public void setField2(String field2) { >>> this.field2 = field2; >>> } >>> >>> @JsonProperty("field_3") >>> public Integer getField3() { >>> return field3; >>> } >>> >>> @JsonProperty("field_3") >>> public void setField3(Integer field3) { >>> this.field3 = field3; >>> } >>> >>> >>> public POJO() { >>> >>> 0 arg empty constructor >>> } >>> public POJO(POJO other) { >>> >>> ... copy constructor >>> } >>> >>> } >>> >>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> wrote: >>> >>>> Can you paste the code for your Pojo? >>>> >>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra < >>>> [email protected]> wrote: >>>> >>>>> >>>>> schemaRegistry.getSchema(dataType) for me is returning a empty schema. >>>>> my Pojo is annotated with the @DefaultSchema(JavaFieldSchema.class) >>>>> Is there something extra I need to do here to register my class with a >>>>> schema registry. >>>>> Note: the code which is building the pipeline is sitting in a library >>>>> (Different package) which is being imported into my pipeline code. So >>>>> perhaps there is some configuration which is missing which allows the >>>>> framework to discover my Pojo and the annotations associated with it. >>>>> >>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> I think I can make it work now. I found a utility method for >>>>>>>> building my coder from class >>>>>>>> Something like >>>>>>>> Class<Data> dataClass = userConfig.getDataClass(); >>>>>>>> Coder<Data> dataCoder = >>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass), >>>>>>>> TypeDescriptor.of(dataClass), >>>>>>>> schemaRegistry.getToRowFunction(dataClass), >>>>>>>> schemaRegistry.getFromRowFunction(dataClass)); >>>>>>>> >>>>>>> >>>>>>> This will work. Though, did annotating the POJO like I said not >>>>>>> work? >>>>>>> >>>>>> No, annotation alone does not work since I am not using concrete >>>>>> classes in the code where the pipeline is being constructed. <Data> above >>>>>> is a template variable in the class which is constructing the pipeline. >>>>>> >>>>>>> >>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> removing setCoder call breaks my pipeline. >>>>>>>>> >>>>>>>>> No Coder has been manually specified; you may do so using >>>>>>>>> .setCoder(). >>>>>>>>> >>>>>>>>> Inferring a Coder from the CoderRegistry failed: Unable to >>>>>>>>> provide a Coder for Data. >>>>>>>>> >>>>>>>>> Building a Coder using a registered CoderProvider failed. >>>>>>>>> >>>>>>>>> Reason being the code which is building the pipeline is based on >>>>>>>>> Java Generics. Actual pipeline building code sets a bunch of >>>>>>>>> parameters >>>>>>>>> which are used to construct the pipeline. >>>>>>>>> PCollection<Data> stream = >>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder) >>>>>>>>> So I guess I will need to provide some more information to the >>>>>>>>> framework to make the annotation work. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> If you annotate your POJO >>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually >>>>>>>>>> automatically >>>>>>>>>> set up schema inference (you'll have to remove the setCoder call). >>>>>>>>>> >>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding. >>>>>>>>>>> In my current code I am doing something like this >>>>>>>>>>> >>>>>>>>>>> PCollection<Data> = >>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class)) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I don't think we make any guarantees about Avro coder. Can you >>>>>>>>>>>> use Beam's schema encoding instead? >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Is there a way to programmatically check for compatibility? I >>>>>>>>>>>>> would like to fail my unit tests if incompatible changes are made >>>>>>>>>>>>> to Pojo. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO before and >>>>>>>>>>>>>> after the change to ensure that they are compatible as you >>>>>>>>>>>>>> expect. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking here >>>>>>>>>>>>>>> in hopes someone has faced a similar problem and can help. >>>>>>>>>>>>>>> I am trying to use "--update" option to update a running >>>>>>>>>>>>>>> Dataflow job. I am noticing that compatibility checks fail any >>>>>>>>>>>>>>> time I add a >>>>>>>>>>>>>>> new field to my data model. Error says >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The Coder or type for step XYZ has changed >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am using a Java Pojo for data. Avro coder to serialize the >>>>>>>>>>>>>>> model. I read somewhere that adding new optional fields to the >>>>>>>>>>>>>>> data should work when updating the pipeline. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am fine with updating the coder or implementation of the >>>>>>>>>>>>>>> model to something which allows me to update the pipeline in >>>>>>>>>>>>>>> cases when I add new optional fields to existing model. Any >>>>>>>>>>>>>>> suggestions? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>
