schemaRegistry.getSchema(dataType) for me is returning a empty schema.
my Pojo is annotated with the @DefaultSchema(JavaFieldSchema.class)
Is there something extra I need to do here to register my class with a
schema registry.
Note: the code which is building the pipeline is sitting in a library
(Different package) which is being imported into my pipeline code. So
perhaps there is some configuration which is missing which allows the
framework to discover my Pojo and the annotations associated with it.

On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <[email protected]>
wrote:

>
>
> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> wrote:
>
>>
>>
>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>> [email protected]> wrote:
>>
>>> I think I can make it work now. I found a utility method for building my
>>> coder from class
>>> Something like
>>> Class<Data> dataClass = userConfig.getDataClass();
>>> Coder<Data> dataCoder =
>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>                 TypeDescriptor.of(dataClass),
>>>                 schemaRegistry.getToRowFunction(dataClass),
>>>                 schemaRegistry.getFromRowFunction(dataClass));
>>>
>>
>> This will work. Though, did annotating the POJO like I said not work?
>>
>  No, annotation alone does not work since I am not using concrete classes
> in the code where the pipeline is being constructed. <Data> above is a
> template variable in the class which is constructing the pipeline.
>
>>
>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>> [email protected]> wrote:
>>>
>>>> removing setCoder call breaks my pipeline.
>>>>
>>>> No Coder has been manually specified;  you may do so using .setCoder().
>>>>
>>>>   Inferring a Coder from the CoderRegistry failed: Unable to provide a
>>>> Coder for Data.
>>>>
>>>>   Building a Coder using a registered CoderProvider failed.
>>>>
>>>> Reason being the code which is building the pipeline is based on Java
>>>> Generics. Actual pipeline building code sets a bunch of parameters which
>>>> are used to construct the pipeline.
>>>> PCollection<Data> stream =
>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>> So I guess I will need to provide some more information to the
>>>> framework to make the annotation work.
>>>>
>>>>
>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> If you annotate your POJO with @DefaultSchema(JavaFieldSchema.class),
>>>>> that will usually automatically set up schema inference (you'll have to
>>>>> remove the setCoder call).
>>>>>
>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> How to set up my pipeline to use Beam's schema encoding.
>>>>>> In my current code I am doing something like this
>>>>>>
>>>>>> PCollection<Data> =
>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>
>>>>>>
>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> I don't think we make any guarantees about Avro coder. Can you use
>>>>>>> Beam's schema encoding instead?
>>>>>>>
>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Is there a way to programmatically check for compatibility? I
>>>>>>>> would like to fail my unit tests if incompatible changes are made to 
>>>>>>>> Pojo.
>>>>>>>>
>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> Check the schema of the avro encoding for the POJO before and
>>>>>>>>> after the change to ensure that they are compatible as you expect.
>>>>>>>>>
>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> This is more of a Dataflow question I guess but asking here in
>>>>>>>>>> hopes someone has faced a similar problem and can help.
>>>>>>>>>> I am trying to use "--update" option to update a running Dataflow
>>>>>>>>>> job. I am noticing that compatibility checks fail any time I add a 
>>>>>>>>>> new
>>>>>>>>>> field to my data model. Error says
>>>>>>>>>>
>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to serialize the model. 
>>>>>>>>>> I read somewhere that adding new optional fields to the data should 
>>>>>>>>>> work when updating the pipeline.
>>>>>>>>>>
>>>>>>>>>> I am fine with updating the coder or implementation of the model to 
>>>>>>>>>> something which allows me to update the pipeline in cases when I add 
>>>>>>>>>> new optional fields to existing model. Any suggestions?
>>>>>>>>>>
>>>>>>>>>>

Reply via email to