So does that mean my only option is to move away from using Java class and
use PCollection<Row> everywhere If I want to be able to update my pipeline.

On Mon, Jan 10, 2022 at 12:06 AM Reuven Lax <[email protected]> wrote:

> SerializableCoder can be tricky for updates, and can also be very
> inefficient.
>
> As you noticed, when inferring schemas from a Java class, the order of
> fields is non deterministic (not much that we can do about this, as Java
> doesn't return the fields in a deterministic order). For regular
> PCollections, Dataflow (though currently only Dataflow) will allow update
> even though the field orders have changed. Unfortunately
>
> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra <
> [email protected]> wrote:
>
>> I now have SchemaCoder for all data flowing through the pipeline.
>> I am noticing another issue once a new pipeline is started with --update
>> flag. In my stateful DoFn where I am doing prevState.read(), exceptions are
>> being thrown. errors like
>> Caused by: java.io.EOFException: reached end of stream after reading 68
>> bytes; 101 bytes expected
>> In this case there was no change in code or data model when relaunching
>> the pipeline.
>>
>> It seems that Schema for my Bean is changing on every run of the
>> pipeline. I manually inspected the Schema returned by the SchemaRegistry by
>> running the pipeline a few times locally and each time I noticed the order
>> of fields in the Schema was different. This changing of order of fields is
>> breaking the new pipeline. Is there a way to enforce some kind of ordering
>> on the fields?
>>
>>
>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra <
>> [email protected]> wrote:
>>
>>> Looking at source code for the JavaBeamSchema it seems I will have to
>>> annotate my getter and setters with any flavour of @Nullable.
>>> Another question -
>>> does the dataflow update pipeline feature work only with SchemaCoder? I
>>> have SerializableCoder and ProtoCoder being used in some of my other
>>> pipelines. I wonder if I have to change those too to open doors for
>>> updating pipelines there?
>>>
>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <
>>> [email protected]> wrote:
>>>
>>>> Some progress made.
>>>> Now I can see that schema has all the needed fields. But all fields in
>>>> the returned schema are marked as "NOT NULL". even though the fields are
>>>> annotated with @javax.annotation.Nullable.
>>>> Is there a different flavor of @Nullable I need to use on my fields?
>>>>
>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> Ah, this isn't a POJO then. In this case, try using
>>>>> DefaultSchema(JavaBeanSchema.class)
>>>>>
>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> It looks something like this. It currently has around 40 fields.
>>>>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>> @EqualsAndHashCode
>>>>>> @ToString
>>>>>> public class POJO implements SomeInterface, Serializable {
>>>>>>
>>>>>>     public static final Integer SOME_CONSTANT_FIELD = 2;
>>>>>>
>>>>>>     @JsonProperty("field_1")
>>>>>>     private String field1;
>>>>>>     @JsonProperty("field_2")
>>>>>>     private String field2;
>>>>>>
>>>>>>     @Nullable
>>>>>>     @JsonProperty("field_3")
>>>>>>     private Integer field3;
>>>>>>
>>>>>>     .....
>>>>>>
>>>>>>     @JsonProperty("field_1")
>>>>>>     public String getField1() {
>>>>>>         return field1;
>>>>>>     }
>>>>>>
>>>>>>     @JsonProperty("field_1")
>>>>>>     public void setField1(String field1) {
>>>>>>         this.field1 = field1;
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @JsonProperty("field_2")
>>>>>>     public String getField2() {
>>>>>>         return field2;
>>>>>>     }
>>>>>>
>>>>>>     @JsonProperty("field_2")
>>>>>>     public void setField2(String field2) {
>>>>>>         this.field2 = field2;
>>>>>>     }
>>>>>>
>>>>>>     @JsonProperty("field_3")
>>>>>>     public Integer getField3() {
>>>>>>         return field3;
>>>>>>     }
>>>>>>
>>>>>>     @JsonProperty("field_3")
>>>>>>     public void setField3(Integer field3) {
>>>>>>         this.field3 = field3;
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     public POJO() {
>>>>>>
>>>>>>          0 arg empty constructor
>>>>>>     }
>>>>>>     public POJO(POJO other) {
>>>>>>
>>>>>>        ... copy constructor
>>>>>>     }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> Can you paste the code for your Pojo?
>>>>>>>
>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a
>>>>>>>> empty schema.
>>>>>>>> my Pojo is annotated with the @DefaultSchema(JavaFieldSchema.class)
>>>>>>>> Is there something extra I need to do here to register my class
>>>>>>>> with a schema registry.
>>>>>>>> Note: the code which is building the pipeline is sitting in a
>>>>>>>> library (Different package) which is being imported into my pipeline 
>>>>>>>> code.
>>>>>>>> So perhaps there is some configuration which is missing which allows 
>>>>>>>> the
>>>>>>>> framework to discover my Pojo and the annotations associated with it.
>>>>>>>>
>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think I can make it work now. I found a utility method for
>>>>>>>>>>> building my coder from class
>>>>>>>>>>> Something like
>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>>>>>>>> Coder<Data> dataCoder =
>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>>>>>>>                 TypeDescriptor.of(dataClass),
>>>>>>>>>>>                 schemaRegistry.getToRowFunction(dataClass),
>>>>>>>>>>>                 schemaRegistry.getFromRowFunction(dataClass));
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This will work. Though, did annotating the POJO like I said not
>>>>>>>>>> work?
>>>>>>>>>>
>>>>>>>>>  No, annotation alone does not work since I am not using concrete
>>>>>>>>> classes in the code where the pipeline is being constructed. <Data> 
>>>>>>>>> above
>>>>>>>>> is a template variable in the class which is constructing the 
>>>>>>>>> pipeline.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> removing setCoder call breaks my pipeline.
>>>>>>>>>>>>
>>>>>>>>>>>> No Coder has been manually specified;  you may do so using
>>>>>>>>>>>> .setCoder().
>>>>>>>>>>>>
>>>>>>>>>>>>   Inferring a Coder from the CoderRegistry failed: Unable to
>>>>>>>>>>>> provide a Coder for Data.
>>>>>>>>>>>>
>>>>>>>>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>>>>>>>>
>>>>>>>>>>>> Reason being the code which is building the pipeline is based
>>>>>>>>>>>> on Java Generics. Actual pipeline building code sets a bunch of 
>>>>>>>>>>>> parameters
>>>>>>>>>>>> which are used to construct the pipeline.
>>>>>>>>>>>> PCollection<Data> stream =
>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>>>>>>>> So I guess I will need to provide some more information to the
>>>>>>>>>>>> framework to make the annotation work.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> If you annotate your POJO
>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually 
>>>>>>>>>>>>> automatically
>>>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder call).
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding.
>>>>>>>>>>>>>> In my current code I am doing something like this
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> PCollection<Data> =
>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder. Can
>>>>>>>>>>>>>>> you use Beam's schema encoding instead?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Is there a way to programmatically check for compatibility?
>>>>>>>>>>>>>>>> I would like to fail my unit tests if incompatible changes are 
>>>>>>>>>>>>>>>> made to
>>>>>>>>>>>>>>>> Pojo.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO before
>>>>>>>>>>>>>>>>> and after the change to ensure that they are compatible as 
>>>>>>>>>>>>>>>>> you expect.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking
>>>>>>>>>>>>>>>>>> here in hopes someone has faced a similar problem and can 
>>>>>>>>>>>>>>>>>> help.
>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a running
>>>>>>>>>>>>>>>>>> Dataflow job. I am noticing that compatibility checks fail 
>>>>>>>>>>>>>>>>>> any time I add a
>>>>>>>>>>>>>>>>>> new field to my data model. Error says
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to serialize 
>>>>>>>>>>>>>>>>>> the model. I read somewhere that adding new optional fields 
>>>>>>>>>>>>>>>>>> to the data should work when updating the pipeline.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of the 
>>>>>>>>>>>>>>>>>> model to something which allows me to update the pipeline in 
>>>>>>>>>>>>>>>>>> cases when I add new optional fields to existing model. Any 
>>>>>>>>>>>>>>>>>> suggestions?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>

Reply via email to