It looks something like this. It currently has around 40 fields.
@JsonIgnoreProperties(ignoreUnknown = true)
@DefaultSchema(JavaFieldSchema.class)
@EqualsAndHashCode
@ToString
public class POJO implements SomeInterface, Serializable {

    public static final Integer SOME_CONSTANT_FIELD = 2;

    @JsonProperty("field_1")
    private String field1;
    @JsonProperty("field_2")
    private String field2;

    @Nullable
    @JsonProperty("field_3")
    private Integer field3;

    .....

    @JsonProperty("field_1")
    public String getField1() {
        return field1;
    }

    @JsonProperty("field_1")
    public void setField1(String field1) {
        this.field1 = field1;
    }


    @JsonProperty("field_2")
    public String getField2() {
        return field2;
    }

    @JsonProperty("field_2")
    public void setField2(String field2) {
        this.field2 = field2;
    }

    @JsonProperty("field_3")
    public Integer getField3() {
        return field3;
    }

    @JsonProperty("field_3")
    public void setField3(Integer field3) {
        this.field3 = field3;
    }


    public POJO() {

         0 arg empty constructor
    }
    public POJO(POJO other) {

       ... copy constructor
    }

}

On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> wrote:

> Can you paste the code for your Pojo?
>
> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <[email protected]>
> wrote:
>
>>
>> schemaRegistry.getSchema(dataType) for me is returning a empty schema.
>> my Pojo is annotated with the @DefaultSchema(JavaFieldSchema.class)
>> Is there something extra I need to do here to register my class with a
>> schema registry.
>> Note: the code which is building the pipeline is sitting in a library
>> (Different package) which is being imported into my pipeline code. So
>> perhaps there is some configuration which is missing which allows the
>> framework to discover my Pojo and the annotations associated with it.
>>
>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>> [email protected]> wrote:
>>
>>>
>>>
>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> wrote:
>>>
>>>>
>>>>
>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>> [email protected]> wrote:
>>>>
>>>>> I think I can make it work now. I found a utility method for building
>>>>> my coder from class
>>>>> Something like
>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>> Coder<Data> dataCoder =
>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>                 TypeDescriptor.of(dataClass),
>>>>>                 schemaRegistry.getToRowFunction(dataClass),
>>>>>                 schemaRegistry.getFromRowFunction(dataClass));
>>>>>
>>>>
>>>> This will work. Though, did annotating the POJO like I said not work?
>>>>
>>>  No, annotation alone does not work since I am not using concrete
>>> classes in the code where the pipeline is being constructed. <Data> above
>>> is a template variable in the class which is constructing the pipeline.
>>>
>>>>
>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> removing setCoder call breaks my pipeline.
>>>>>>
>>>>>> No Coder has been manually specified;  you may do so using
>>>>>> .setCoder().
>>>>>>
>>>>>>   Inferring a Coder from the CoderRegistry failed: Unable to provide
>>>>>> a Coder for Data.
>>>>>>
>>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>>
>>>>>> Reason being the code which is building the pipeline is based on Java
>>>>>> Generics. Actual pipeline building code sets a bunch of parameters which
>>>>>> are used to construct the pipeline.
>>>>>> PCollection<Data> stream =
>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>> So I guess I will need to provide some more information to the
>>>>>> framework to make the annotation work.
>>>>>>
>>>>>>
>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> If you annotate your POJO
>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually 
>>>>>>> automatically
>>>>>>> set up schema inference (you'll have to remove the setCoder call).
>>>>>>>
>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> How to set up my pipeline to use Beam's schema encoding.
>>>>>>>> In my current code I am doing something like this
>>>>>>>>
>>>>>>>> PCollection<Data> =
>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> I don't think we make any guarantees about Avro coder. Can you use
>>>>>>>>> Beam's schema encoding instead?
>>>>>>>>>
>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Is there a way to programmatically check for compatibility? I
>>>>>>>>>> would like to fail my unit tests if incompatible changes are made to 
>>>>>>>>>> Pojo.
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Check the schema of the avro encoding for the POJO before and
>>>>>>>>>>> after the change to ensure that they are compatible as you expect.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> This is more of a Dataflow question I guess but asking here in
>>>>>>>>>>>> hopes someone has faced a similar problem and can help.
>>>>>>>>>>>> I am trying to use "--update" option to update a running
>>>>>>>>>>>> Dataflow job. I am noticing that compatibility checks fail any 
>>>>>>>>>>>> time I add a
>>>>>>>>>>>> new field to my data model. Error says
>>>>>>>>>>>>
>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to serialize the 
>>>>>>>>>>>> model. I read somewhere that adding new optional fields to the 
>>>>>>>>>>>> data should work when updating the pipeline.
>>>>>>>>>>>>
>>>>>>>>>>>> I am fine with updating the coder or implementation of the model 
>>>>>>>>>>>> to something which allows me to update the pipeline in cases when 
>>>>>>>>>>>> I add new optional fields to existing model. Any suggestions?
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to