Some progress made. Now I can see that schema has all the needed fields. But all fields in the returned schema are marked as "NOT NULL". even though the fields are annotated with @javax.annotation.Nullable. Is there a different flavor of @Nullable I need to use on my fields?
On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote: > Ah, this isn't a POJO then. In this case, try using > DefaultSchema(JavaBeanSchema.class) > > On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <[email protected]> > wrote: > >> It looks something like this. It currently has around 40 fields. >> @JsonIgnoreProperties(ignoreUnknown = true) >> @DefaultSchema(JavaFieldSchema.class) >> @EqualsAndHashCode >> @ToString >> public class POJO implements SomeInterface, Serializable { >> >> public static final Integer SOME_CONSTANT_FIELD = 2; >> >> @JsonProperty("field_1") >> private String field1; >> @JsonProperty("field_2") >> private String field2; >> >> @Nullable >> @JsonProperty("field_3") >> private Integer field3; >> >> ..... >> >> @JsonProperty("field_1") >> public String getField1() { >> return field1; >> } >> >> @JsonProperty("field_1") >> public void setField1(String field1) { >> this.field1 = field1; >> } >> >> >> @JsonProperty("field_2") >> public String getField2() { >> return field2; >> } >> >> @JsonProperty("field_2") >> public void setField2(String field2) { >> this.field2 = field2; >> } >> >> @JsonProperty("field_3") >> public Integer getField3() { >> return field3; >> } >> >> @JsonProperty("field_3") >> public void setField3(Integer field3) { >> this.field3 = field3; >> } >> >> >> public POJO() { >> >> 0 arg empty constructor >> } >> public POJO(POJO other) { >> >> ... copy constructor >> } >> >> } >> >> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> wrote: >> >>> Can you paste the code for your Pojo? >>> >>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra < >>> [email protected]> wrote: >>> >>>> >>>> schemaRegistry.getSchema(dataType) for me is returning a empty schema. >>>> my Pojo is annotated with the @DefaultSchema(JavaFieldSchema.class) >>>> Is there something extra I need to do here to register my class with a >>>> schema registry. >>>> Note: the code which is building the pipeline is sitting in a library >>>> (Different package) which is being imported into my pipeline code. So >>>> perhaps there is some configuration which is missing which allows the >>>> framework to discover my Pojo and the annotations associated with it. >>>> >>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra < >>>> [email protected]> wrote: >>>> >>>>> >>>>> >>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I think I can make it work now. I found a utility method for >>>>>>> building my coder from class >>>>>>> Something like >>>>>>> Class<Data> dataClass = userConfig.getDataClass(); >>>>>>> Coder<Data> dataCoder = >>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass), >>>>>>> TypeDescriptor.of(dataClass), >>>>>>> schemaRegistry.getToRowFunction(dataClass), >>>>>>> schemaRegistry.getFromRowFunction(dataClass)); >>>>>>> >>>>>> >>>>>> This will work. Though, did annotating the POJO like I said not work? >>>>>> >>>>> No, annotation alone does not work since I am not using concrete >>>>> classes in the code where the pipeline is being constructed. <Data> above >>>>> is a template variable in the class which is constructing the pipeline. >>>>> >>>>>> >>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> removing setCoder call breaks my pipeline. >>>>>>>> >>>>>>>> No Coder has been manually specified; you may do so using >>>>>>>> .setCoder(). >>>>>>>> >>>>>>>> Inferring a Coder from the CoderRegistry failed: Unable to >>>>>>>> provide a Coder for Data. >>>>>>>> >>>>>>>> Building a Coder using a registered CoderProvider failed. >>>>>>>> >>>>>>>> Reason being the code which is building the pipeline is based on >>>>>>>> Java Generics. Actual pipeline building code sets a bunch of parameters >>>>>>>> which are used to construct the pipeline. >>>>>>>> PCollection<Data> stream = >>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder) >>>>>>>> So I guess I will need to provide some more information to the >>>>>>>> framework to make the annotation work. >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]> wrote: >>>>>>>> >>>>>>>>> If you annotate your POJO >>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually >>>>>>>>> automatically >>>>>>>>> set up schema inference (you'll have to remove the setCoder call). >>>>>>>>> >>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> How to set up my pipeline to use Beam's schema encoding. >>>>>>>>>> In my current code I am doing something like this >>>>>>>>>> >>>>>>>>>> PCollection<Data> = >>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class)) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I don't think we make any guarantees about Avro coder. Can you >>>>>>>>>>> use Beam's schema encoding instead? >>>>>>>>>>> >>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Is there a way to programmatically check for compatibility? I >>>>>>>>>>>> would like to fail my unit tests if incompatible changes are made >>>>>>>>>>>> to Pojo. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Check the schema of the avro encoding for the POJO before and >>>>>>>>>>>>> after the change to ensure that they are compatible as you expect. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking here >>>>>>>>>>>>>> in hopes someone has faced a similar problem and can help. >>>>>>>>>>>>>> I am trying to use "--update" option to update a running >>>>>>>>>>>>>> Dataflow job. I am noticing that compatibility checks fail any >>>>>>>>>>>>>> time I add a >>>>>>>>>>>>>> new field to my data model. Error says >>>>>>>>>>>>>> >>>>>>>>>>>>>> The Coder or type for step XYZ has changed >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am using a Java Pojo for data. Avro coder to serialize the >>>>>>>>>>>>>> model. I read somewhere that adding new optional fields to the >>>>>>>>>>>>>> data should work when updating the pipeline. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am fine with updating the coder or implementation of the model >>>>>>>>>>>>>> to something which allows me to update the pipeline in cases >>>>>>>>>>>>>> when I add new optional fields to existing model. Any >>>>>>>>>>>>>> suggestions? >>>>>>>>>>>>>> >>>>>>>>>>>>>>
