Looking at source code for the JavaBeamSchema it seems I will have to
annotate my getter and setters with any flavour of @Nullable.
Another question -
does the dataflow update pipeline feature work only with SchemaCoder? I
have SerializableCoder and ProtoCoder being used in some of my other
pipelines. I wonder if I have to change those too to open doors for
updating pipelines there?

On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <[email protected]>
wrote:

> Some progress made.
> Now I can see that schema has all the needed fields. But all fields in the
> returned schema are marked as "NOT NULL". even though the fields are
> annotated with @javax.annotation.Nullable.
> Is there a different flavor of @Nullable I need to use on my fields?
>
> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote:
>
>> Ah, this isn't a POJO then. In this case, try using
>> DefaultSchema(JavaBeanSchema.class)
>>
>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <
>> [email protected]> wrote:
>>
>>> It looks something like this. It currently has around 40 fields.
>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>> @DefaultSchema(JavaFieldSchema.class)
>>> @EqualsAndHashCode
>>> @ToString
>>> public class POJO implements SomeInterface, Serializable {
>>>
>>>     public static final Integer SOME_CONSTANT_FIELD = 2;
>>>
>>>     @JsonProperty("field_1")
>>>     private String field1;
>>>     @JsonProperty("field_2")
>>>     private String field2;
>>>
>>>     @Nullable
>>>     @JsonProperty("field_3")
>>>     private Integer field3;
>>>
>>>     .....
>>>
>>>     @JsonProperty("field_1")
>>>     public String getField1() {
>>>         return field1;
>>>     }
>>>
>>>     @JsonProperty("field_1")
>>>     public void setField1(String field1) {
>>>         this.field1 = field1;
>>>     }
>>>
>>>
>>>     @JsonProperty("field_2")
>>>     public String getField2() {
>>>         return field2;
>>>     }
>>>
>>>     @JsonProperty("field_2")
>>>     public void setField2(String field2) {
>>>         this.field2 = field2;
>>>     }
>>>
>>>     @JsonProperty("field_3")
>>>     public Integer getField3() {
>>>         return field3;
>>>     }
>>>
>>>     @JsonProperty("field_3")
>>>     public void setField3(Integer field3) {
>>>         this.field3 = field3;
>>>     }
>>>
>>>
>>>     public POJO() {
>>>
>>>          0 arg empty constructor
>>>     }
>>>     public POJO(POJO other) {
>>>
>>>        ... copy constructor
>>>     }
>>>
>>> }
>>>
>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> Can you paste the code for your Pojo?
>>>>
>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>> schemaRegistry.getSchema(dataType) for me is returning a empty schema.
>>>>> my Pojo is annotated with the @DefaultSchema(JavaFieldSchema.class)
>>>>> Is there something extra I need to do here to register my class with a
>>>>> schema registry.
>>>>> Note: the code which is building the pipeline is sitting in a library
>>>>> (Different package) which is being imported into my pipeline code. So
>>>>> perhaps there is some configuration which is missing which allows the
>>>>> framework to discover my Pojo and the annotations associated with it.
>>>>>
>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> I think I can make it work now. I found a utility method for
>>>>>>>> building my coder from class
>>>>>>>> Something like
>>>>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>>>>> Coder<Data> dataCoder =
>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>>>>                 TypeDescriptor.of(dataClass),
>>>>>>>>                 schemaRegistry.getToRowFunction(dataClass),
>>>>>>>>                 schemaRegistry.getFromRowFunction(dataClass));
>>>>>>>>
>>>>>>>
>>>>>>> This will work. Though, did annotating the POJO like I said not
>>>>>>> work?
>>>>>>>
>>>>>>  No, annotation alone does not work since I am not using concrete
>>>>>> classes in the code where the pipeline is being constructed. <Data> above
>>>>>> is a template variable in the class which is constructing the pipeline.
>>>>>>
>>>>>>>
>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> removing setCoder call breaks my pipeline.
>>>>>>>>>
>>>>>>>>> No Coder has been manually specified;  you may do so using
>>>>>>>>> .setCoder().
>>>>>>>>>
>>>>>>>>>   Inferring a Coder from the CoderRegistry failed: Unable to
>>>>>>>>> provide a Coder for Data.
>>>>>>>>>
>>>>>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>>>>>
>>>>>>>>> Reason being the code which is building the pipeline is based on
>>>>>>>>> Java Generics. Actual pipeline building code sets a bunch of 
>>>>>>>>> parameters
>>>>>>>>> which are used to construct the pipeline.
>>>>>>>>> PCollection<Data> stream =
>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>>>>> So I guess I will need to provide some more information to the
>>>>>>>>> framework to make the annotation work.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> If you annotate your POJO
>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually 
>>>>>>>>>> automatically
>>>>>>>>>> set up schema inference (you'll have to remove the setCoder call).
>>>>>>>>>>
>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding.
>>>>>>>>>>> In my current code I am doing something like this
>>>>>>>>>>>
>>>>>>>>>>> PCollection<Data> =
>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I don't think we make any guarantees about Avro coder. Can you
>>>>>>>>>>>> use Beam's schema encoding instead?
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Is there a way to programmatically check for compatibility? I
>>>>>>>>>>>>> would like to fail my unit tests if incompatible changes are made 
>>>>>>>>>>>>> to Pojo.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO before and
>>>>>>>>>>>>>> after the change to ensure that they are compatible as you 
>>>>>>>>>>>>>> expect.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking here
>>>>>>>>>>>>>>> in hopes someone has faced a similar problem and can help.
>>>>>>>>>>>>>>> I am trying to use "--update" option to update a running
>>>>>>>>>>>>>>> Dataflow job. I am noticing that compatibility checks fail any 
>>>>>>>>>>>>>>> time I add a
>>>>>>>>>>>>>>> new field to my data model. Error says
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to serialize the 
>>>>>>>>>>>>>>> model. I read somewhere that adding new optional fields to the 
>>>>>>>>>>>>>>> data should work when updating the pipeline.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of the 
>>>>>>>>>>>>>>> model to something which allows me to update the pipeline in 
>>>>>>>>>>>>>>> cases when I add new optional fields to existing model. Any 
>>>>>>>>>>>>>>> suggestions?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Reply via email to