I think I can make it work now. I found a utility method for building my
coder from class
Something like
Class<Data> dataClass = userConfig.getDataClass();
Coder<Data> dataCoder = SchemaCoder.of(schemaRegistry.getSchema(dataClass),
                TypeDescriptor.of(dataClass),
                schemaRegistry.getToRowFunction(dataClass),
                schemaRegistry.getFromRowFunction(dataClass));

My next question is -
Assuming I start using SchemaCoder for all my Pojos.
What kind of changes are allowed on Pojo in future to not break
compatibility? For instance, can I change the order of fields in Pojo? I
believe I should be able to add a new field with @Nullable annotation, is
that correct?

Is there a way I can set up a unit test to catch breaking changes in
future.  Something like
UnitTestBaselinePojo pojo1 = Copy paste of todays pojo in unit test package
RealPojo pojo2 = ....  real pojo from main package.
SomeUtil.incompatible(pojo1, pojo2)



On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <[email protected]>
wrote:

> removing setCoder call breaks my pipeline.
>
> No Coder has been manually specified;  you may do so using .setCoder().
>
>   Inferring a Coder from the CoderRegistry failed: Unable to provide a
> Coder for Data.
>
>   Building a Coder using a registered CoderProvider failed.
>
> Reason being the code which is building the pipeline is based on Java
> Generics. Actual pipeline building code sets a bunch of parameters which
> are used to construct the pipeline.
> PCollection<Data> stream =
> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
> So I guess I will need to provide some more information to the framework
> to make the annotation work.
>
>
> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]> wrote:
>
>> If you annotate your POJO with @DefaultSchema(JavaFieldSchema.class),
>> that will usually automatically set up schema inference (you'll have to
>> remove the setCoder call).
>>
>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>> [email protected]> wrote:
>>
>>> How to set up my pipeline to use Beam's schema encoding.
>>> In my current code I am doing something like this
>>>
>>> PCollection<Data> =
>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>
>>>
>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> I don't think we make any guarantees about Avro coder. Can you use
>>>> Beam's schema encoding instead?
>>>>
>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>> [email protected]> wrote:
>>>>
>>>>> Is there a way to programmatically check for compatibility? I
>>>>> would like to fail my unit tests if incompatible changes are made to Pojo.
>>>>>
>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>>> Check the schema of the avro encoding for the POJO before and after
>>>>>> the change to ensure that they are compatible as you expect.
>>>>>>
>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> This is more of a Dataflow question I guess but asking here in hopes
>>>>>>> someone has faced a similar problem and can help.
>>>>>>> I am trying to use "--update" option to update a running Dataflow
>>>>>>> job. I am noticing that compatibility checks fail any time I add a new
>>>>>>> field to my data model. Error says
>>>>>>>
>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>
>>>>>>>
>>>>>>> I am using a Java Pojo for data.  Avro coder to serialize the model. I 
>>>>>>> read somewhere that adding new optional fields to the data should work 
>>>>>>> when updating the pipeline.
>>>>>>>
>>>>>>> I am fine with updating the coder or implementation of the model to 
>>>>>>> something which allows me to update the pipeline in cases when I add 
>>>>>>> new optional fields to existing model. Any suggestions?
>>>>>>>
>>>>>>>

Reply via email to