On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> wrote:

>
>
> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <[email protected]>
> wrote:
>
>> I think I can make it work now. I found a utility method for building my
>> coder from class
>> Something like
>> Class<Data> dataClass = userConfig.getDataClass();
>> Coder<Data> dataCoder =
>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>                 TypeDescriptor.of(dataClass),
>>                 schemaRegistry.getToRowFunction(dataClass),
>>                 schemaRegistry.getFromRowFunction(dataClass));
>>
>
> This will work. Though, did annotating the POJO like I said not work?
>
 No, annotation alone does not work since I am not using concrete classes
in the code where the pipeline is being constructed. <Data> above is a
template variable in the class which is constructing the pipeline.

>
>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>> [email protected]> wrote:
>>
>>> removing setCoder call breaks my pipeline.
>>>
>>> No Coder has been manually specified;  you may do so using .setCoder().
>>>
>>>   Inferring a Coder from the CoderRegistry failed: Unable to provide a
>>> Coder for Data.
>>>
>>>   Building a Coder using a registered CoderProvider failed.
>>>
>>> Reason being the code which is building the pipeline is based on Java
>>> Generics. Actual pipeline building code sets a bunch of parameters which
>>> are used to construct the pipeline.
>>> PCollection<Data> stream =
>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>> So I guess I will need to provide some more information to the framework
>>> to make the annotation work.
>>>
>>>
>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> If you annotate your POJO with @DefaultSchema(JavaFieldSchema.class),
>>>> that will usually automatically set up schema inference (you'll have to
>>>> remove the setCoder call).
>>>>
>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>> [email protected]> wrote:
>>>>
>>>>> How to set up my pipeline to use Beam's schema encoding.
>>>>> In my current code I am doing something like this
>>>>>
>>>>> PCollection<Data> =
>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>
>>>>>
>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> I don't think we make any guarantees about Avro coder. Can you use
>>>>>> Beam's schema encoding instead?
>>>>>>
>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Is there a way to programmatically check for compatibility? I
>>>>>>> would like to fail my unit tests if incompatible changes are made to 
>>>>>>> Pojo.
>>>>>>>
>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]> wrote:
>>>>>>>
>>>>>>>> Check the schema of the avro encoding for the POJO before and after
>>>>>>>> the change to ensure that they are compatible as you expect.
>>>>>>>>
>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> This is more of a Dataflow question I guess but asking here in
>>>>>>>>> hopes someone has faced a similar problem and can help.
>>>>>>>>> I am trying to use "--update" option to update a running Dataflow
>>>>>>>>> job. I am noticing that compatibility checks fail any time I add a new
>>>>>>>>> field to my data model. Error says
>>>>>>>>>
>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am using a Java Pojo for data.  Avro coder to serialize the model. 
>>>>>>>>> I read somewhere that adding new optional fields to the data should 
>>>>>>>>> work when updating the pipeline.
>>>>>>>>>
>>>>>>>>> I am fine with updating the coder or implementation of the model to 
>>>>>>>>> something which allows me to update the pipeline in cases when I add 
>>>>>>>>> new optional fields to existing model. Any suggestions?
>>>>>>>>>
>>>>>>>>>

Reply via email to