I now have SchemaCoder for all data flowing through the pipeline.
I am noticing another issue once a new pipeline is started with --update
flag. In my stateful DoFn where I am doing prevState.read(), exceptions are
being thrown. errors like
Caused by: java.io.EOFException: reached end of stream after reading 68
bytes; 101 bytes expected
In this case there was no change in code or data model when relaunching the
pipeline.

It seems that Schema for my Bean is changing on every run of the pipeline.
I manually inspected the Schema returned by the SchemaRegistry by running
the pipeline a few times locally and each time I noticed the order of
fields in the Schema was different. This changing of order of fields is
breaking the new pipeline. Is there a way to enforce some kind of ordering
on the fields?


On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra <[email protected]>
wrote:

> Looking at source code for the JavaBeamSchema it seems I will have to
> annotate my getter and setters with any flavour of @Nullable.
> Another question -
> does the dataflow update pipeline feature work only with SchemaCoder? I
> have SerializableCoder and ProtoCoder being used in some of my other
> pipelines. I wonder if I have to change those too to open doors for
> updating pipelines there?
>
> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <[email protected]>
> wrote:
>
>> Some progress made.
>> Now I can see that schema has all the needed fields. But all fields in
>> the returned schema are marked as "NOT NULL". even though the fields are
>> annotated with @javax.annotation.Nullable.
>> Is there a different flavor of @Nullable I need to use on my fields?
>>
>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote:
>>
>>> Ah, this isn't a POJO then. In this case, try using
>>> DefaultSchema(JavaBeanSchema.class)
>>>
>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <
>>> [email protected]> wrote:
>>>
>>>> It looks something like this. It currently has around 40 fields.
>>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>> @EqualsAndHashCode
>>>> @ToString
>>>> public class POJO implements SomeInterface, Serializable {
>>>>
>>>>     public static final Integer SOME_CONSTANT_FIELD = 2;
>>>>
>>>>     @JsonProperty("field_1")
>>>>     private String field1;
>>>>     @JsonProperty("field_2")
>>>>     private String field2;
>>>>
>>>>     @Nullable
>>>>     @JsonProperty("field_3")
>>>>     private Integer field3;
>>>>
>>>>     .....
>>>>
>>>>     @JsonProperty("field_1")
>>>>     public String getField1() {
>>>>         return field1;
>>>>     }
>>>>
>>>>     @JsonProperty("field_1")
>>>>     public void setField1(String field1) {
>>>>         this.field1 = field1;
>>>>     }
>>>>
>>>>
>>>>     @JsonProperty("field_2")
>>>>     public String getField2() {
>>>>         return field2;
>>>>     }
>>>>
>>>>     @JsonProperty("field_2")
>>>>     public void setField2(String field2) {
>>>>         this.field2 = field2;
>>>>     }
>>>>
>>>>     @JsonProperty("field_3")
>>>>     public Integer getField3() {
>>>>         return field3;
>>>>     }
>>>>
>>>>     @JsonProperty("field_3")
>>>>     public void setField3(Integer field3) {
>>>>         this.field3 = field3;
>>>>     }
>>>>
>>>>
>>>>     public POJO() {
>>>>
>>>>          0 arg empty constructor
>>>>     }
>>>>     public POJO(POJO other) {
>>>>
>>>>        ... copy constructor
>>>>     }
>>>>
>>>> }
>>>>
>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> Can you paste the code for your Pojo?
>>>>>
>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>> schemaRegistry.getSchema(dataType) for me is returning a
>>>>>> empty schema.
>>>>>> my Pojo is annotated with the @DefaultSchema(JavaFieldSchema.class)
>>>>>> Is there something extra I need to do here to register my class with
>>>>>> a schema registry.
>>>>>> Note: the code which is building the pipeline is sitting in a library
>>>>>> (Different package) which is being imported into my pipeline code. So
>>>>>> perhaps there is some configuration which is missing which allows the
>>>>>> framework to discover my Pojo and the annotations associated with it.
>>>>>>
>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> I think I can make it work now. I found a utility method for
>>>>>>>>> building my coder from class
>>>>>>>>> Something like
>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>>>>>> Coder<Data> dataCoder =
>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>>>>>                 TypeDescriptor.of(dataClass),
>>>>>>>>>                 schemaRegistry.getToRowFunction(dataClass),
>>>>>>>>>                 schemaRegistry.getFromRowFunction(dataClass));
>>>>>>>>>
>>>>>>>>
>>>>>>>> This will work. Though, did annotating the POJO like I said not
>>>>>>>> work?
>>>>>>>>
>>>>>>>  No, annotation alone does not work since I am not using concrete
>>>>>>> classes in the code where the pipeline is being constructed. <Data> 
>>>>>>> above
>>>>>>> is a template variable in the class which is constructing the pipeline.
>>>>>>>
>>>>>>>>
>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> removing setCoder call breaks my pipeline.
>>>>>>>>>>
>>>>>>>>>> No Coder has been manually specified;  you may do so using
>>>>>>>>>> .setCoder().
>>>>>>>>>>
>>>>>>>>>>   Inferring a Coder from the CoderRegistry failed: Unable to
>>>>>>>>>> provide a Coder for Data.
>>>>>>>>>>
>>>>>>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>>>>>>
>>>>>>>>>> Reason being the code which is building the pipeline is based on
>>>>>>>>>> Java Generics. Actual pipeline building code sets a bunch of 
>>>>>>>>>> parameters
>>>>>>>>>> which are used to construct the pipeline.
>>>>>>>>>> PCollection<Data> stream =
>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>>>>>> So I guess I will need to provide some more information to the
>>>>>>>>>> framework to make the annotation work.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> If you annotate your POJO
>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually 
>>>>>>>>>>> automatically
>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder call).
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding.
>>>>>>>>>>>> In my current code I am doing something like this
>>>>>>>>>>>>
>>>>>>>>>>>> PCollection<Data> =
>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder. Can you
>>>>>>>>>>>>> use Beam's schema encoding instead?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is there a way to programmatically check for compatibility? I
>>>>>>>>>>>>>> would like to fail my unit tests if incompatible changes are 
>>>>>>>>>>>>>> made to Pojo.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO before
>>>>>>>>>>>>>>> and after the change to ensure that they are compatible as you 
>>>>>>>>>>>>>>> expect.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking here
>>>>>>>>>>>>>>>> in hopes someone has faced a similar problem and can help.
>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a running
>>>>>>>>>>>>>>>> Dataflow job. I am noticing that compatibility checks fail any 
>>>>>>>>>>>>>>>> time I add a
>>>>>>>>>>>>>>>> new field to my data model. Error says
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to serialize the 
>>>>>>>>>>>>>>>> model. I read somewhere that adding new optional fields to the 
>>>>>>>>>>>>>>>> data should work when updating the pipeline.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of the 
>>>>>>>>>>>>>>>> model to something which allows me to update the pipeline in 
>>>>>>>>>>>>>>>> cases when I add new optional fields to existing model. Any 
>>>>>>>>>>>>>>>> suggestions?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>

Reply via email to