On Mon, Jan 10, 2022 at 11:42 AM Reuven Lax <[email protected]> wrote:

> Gaurav,
>
> Can you try this out and tell me if it works for you?
> https://github.com/apache/beam/pull/16472
>
> This allows you to annotate all your getters with SchemaFieldNumber, to
> specify the order of fields.
>
Hi Reuven
I am able to build and try out your code and it seems to be working fine
so far. I will continue to do more testing with this build. But I do see a
small issue with the current implementation. In the documentation it
says there are no gaps allowed in those numbers which makes it hard to use
this on classes which inherit from another class which uses this
annotation.
In my case I have an input Bean and an Output Bean that extends the Input
bean. I have to configure Coders for both these Beans. Input Bean today
uses number 0-40, and output Bean uses 41-46. If tomorrow I add a new field
to the input bean then I will have to use 47 on that field. to
avoid duplicates in output Bean. The only solution around this problem I
can think of is to allow for gaps in the number.


>
> Reuven
>
> On Mon, Jan 10, 2022 at 9:28 AM Reuven Lax <[email protected]> wrote:
>
>> Yes, timers will get carried over.
>>
>> On Mon, Jan 10, 2022 at 9:19 AM gaurav mishra <
>> [email protected]> wrote:
>>
>>> That DelegateCoder seems like a good idea to keep the DoFn code clean.
>>> In the short term I will perhaps use json string as my intermediary and
>>> will let Jackson do the serialization and deserialization. from what I have
>>> learned so far is that Dataflow does some magic in the background to
>>> re-order fields to make the PCollection schema compatible to the previous
>>> job (if I use SchemaCoder), So I don't have to make a switch from Java
>>> beans yet. I can use the DelegateCoder for my statespec and continue
>>> using SchemaCoder for PCollections.
>>>
>>> @Reuven
>>> Regarding timers in stateful DoFn, do they get carried over to the new
>>> job in Dataflow?
>>>
>>>
>>>
>>>
>>> On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin <[email protected]>
>>> wrote:
>>>
>>>> Hello Gaurav,
>>>>
>>>> It looks like the issues related to state evolution is something quite
>>>> recurrent in this mailing list, and do not seem to be covered in the Beam
>>>> docs, unfortunately.
>>>>
>>>> I am not familiar with Google Dataflow runner, but the way I achieved
>>>> state evolution on Flink runner was a combo of:
>>>> 1 - version-controlled Avro schemas for generating POJOs (models) which
>>>> were used in stateful Beam DoFns
>>>> 2 - a generic Avro-generated POJO as an intermediary - ( schema:
>>>> string, payload: bytes )
>>>> 3 -
>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html
>>>> responsible for converting a model into an intermediary (2)
>>>>
>>>> This allowed adding & dropping nullable fields and to do other
>>>> Avro-forward compatible changes without loosing the state. The order of
>>>> fields also didn't matter in this setting. And this also meant double ser-
>>>> and deserialization, unfortunately. But I wasn't able to come to any better
>>>> solution, given that there seemed to be no public info available about the
>>>> topic, or, maybe, due to the lack of my experience.
>>>>
>>>> Relevant mail threads:
>>>> https://www.mail-archive.com/[email protected]/msg07249.html
>>>>
>>>> Best Regards,
>>>> Pavel Solomin
>>>>
>>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin
>>>> | Linkedin <https://www.linkedin.com/in/pavelsolomin>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 10 Jan 2022 at 08:43, gaurav mishra <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Unfortunately, as you've discovered, Dataflow's update only pays
>>>>>> attention to actual PCollections, and does not look at DoFn state 
>>>>>> variables
>>>>>> at all (historically, stateful DoFns did not yet exist when Dataflow
>>>>>> implemented its update support).
>>>>>>
>>>>>> You can file a feature request against Google to implement update on
>>>>>> state variables, as this is honestly a major feature gap. In the meantime
>>>>>> there might be some workarounds you can use until this support is added.
>>>>>> e.g.
>>>>>>    - you could convert the data into a protocol buffer before storing
>>>>>> into state; ProtoCoder should handle new fields, and fields in a protocol
>>>>>> buffer have a deterministic order.
>>>>>>
>>>>> Is there an easy way to go from a java object to a protocol buffer
>>>>> object. The only way I can think of is to first serialize the java model 
>>>>> to
>>>>> json and then deserialize the json into a proto buffer object.  Same for
>>>>> the other way around - protobuf to java object.  while I was writing this 
>>>>> I
>>>>> realized I can maybe store the json dump of my java object in state spec.
>>>>> That should also work.
>>>>>
>>>>>>    - you could explicitly use the Row object, although this will
>>>>>> again make your code more complex.
>>>>>>
>>>>>
>>>>>> Another intermediate option - we could fairly easily add
>>>>>> a @SchemaFieldId annotation to Beam that would allow you to specify the
>>>>>> order of fields. You would have to annotate every getter, something like
>>>>>> this:
>>>>>>
>>>>>> @ScheamFieldId(0)
>>>>>> public T getV1();
>>>>>>
>>>>>> @SchemaFieldId(1)
>>>>>> public T getV2();
>>>>>>
>>>>> Yes please. If this is an easy change and If you can point me in the
>>>>> right direction I will be happy to submit a patch for this.
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> etc.
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> SerializableCoder can be tricky for updates, and can also be very
>>>>>>> inefficient.
>>>>>>>
>>>>>>> As you noticed, when inferring schemas from a Java class, the order
>>>>>>> of fields is non deterministic (not much that we can do about this, as 
>>>>>>> Java
>>>>>>> doesn't return the fields in a deterministic order). For regular
>>>>>>> PCollections, Dataflow (though currently only Dataflow) will allow 
>>>>>>> update
>>>>>>> even though the field orders have changed. Unfortunately
>>>>>>>
>>>>>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> I now have SchemaCoder for all data flowing through the pipeline.
>>>>>>>> I am noticing another issue once a new pipeline is started with
>>>>>>>> --update flag. In my stateful DoFn where I am doing prevState.read(),
>>>>>>>> exceptions are being thrown. errors like
>>>>>>>> Caused by: java.io.EOFException: reached end of stream after
>>>>>>>> reading 68 bytes; 101 bytes expected
>>>>>>>> In this case there was no change in code or data model when
>>>>>>>> relaunching the pipeline.
>>>>>>>>
>>>>>>>> It seems that Schema for my Bean is changing on every run of the
>>>>>>>> pipeline. I manually inspected the Schema returned by the 
>>>>>>>> SchemaRegistry by
>>>>>>>> running the pipeline a few times locally and each time I noticed the 
>>>>>>>> order
>>>>>>>> of fields in the Schema was different. This changing of order of 
>>>>>>>> fields is
>>>>>>>> breaking the new pipeline. Is there a way to enforce some kind of 
>>>>>>>> ordering
>>>>>>>> on the fields?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Looking at source code for the JavaBeamSchema it seems I will have
>>>>>>>>> to annotate my getter and setters with any flavour of @Nullable.
>>>>>>>>> Another question -
>>>>>>>>> does the dataflow update pipeline feature work only with
>>>>>>>>> SchemaCoder? I have SerializableCoder and ProtoCoder being used in 
>>>>>>>>> some of
>>>>>>>>> my other pipelines. I wonder if I have to change those too to open 
>>>>>>>>> doors
>>>>>>>>> for updating pipelines there?
>>>>>>>>>
>>>>>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Some progress made.
>>>>>>>>>> Now I can see that schema has all the needed fields. But all
>>>>>>>>>> fields in the returned schema are marked as "NOT NULL". even though 
>>>>>>>>>> the
>>>>>>>>>> fields are annotated with @javax.annotation.Nullable.
>>>>>>>>>> Is there a different flavor of @Nullable I need to use on my
>>>>>>>>>> fields?
>>>>>>>>>>
>>>>>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ah, this isn't a POJO then. In this case, try using
>>>>>>>>>>> DefaultSchema(JavaBeanSchema.class)
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> It looks something like this. It currently has around 40
>>>>>>>>>>>> fields.
>>>>>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>>>>>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>>> @EqualsAndHashCode
>>>>>>>>>>>> @ToString
>>>>>>>>>>>> public class POJO implements SomeInterface, Serializable {
>>>>>>>>>>>>
>>>>>>>>>>>>     public static final Integer SOME_CONSTANT_FIELD = 2;
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>     private String field1;
>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>     private String field2;
>>>>>>>>>>>>
>>>>>>>>>>>>     @Nullable
>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>     private Integer field3;
>>>>>>>>>>>>
>>>>>>>>>>>>     .....
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>     public String getField1() {
>>>>>>>>>>>>         return field1;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>     public void setField1(String field1) {
>>>>>>>>>>>>         this.field1 = field1;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>     public String getField2() {
>>>>>>>>>>>>         return field2;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>     public void setField2(String field2) {
>>>>>>>>>>>>         this.field2 = field2;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>     public Integer getField3() {
>>>>>>>>>>>>         return field3;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>     public void setField3(Integer field3) {
>>>>>>>>>>>>         this.field3 = field3;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     public POJO() {
>>>>>>>>>>>>
>>>>>>>>>>>>          0 arg empty constructor
>>>>>>>>>>>>     }
>>>>>>>>>>>>     public POJO(POJO other) {
>>>>>>>>>>>>
>>>>>>>>>>>>        ... copy constructor
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Can you paste the code for your Pojo?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a
>>>>>>>>>>>>>> empty schema.
>>>>>>>>>>>>>> my Pojo is annotated with
>>>>>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>>>>> Is there something extra I need to do here to register my
>>>>>>>>>>>>>> class with a schema registry.
>>>>>>>>>>>>>> Note: the code which is building the pipeline is sitting in a
>>>>>>>>>>>>>> library (Different package) which is being imported into my 
>>>>>>>>>>>>>> pipeline code.
>>>>>>>>>>>>>> So perhaps there is some configuration which is missing which 
>>>>>>>>>>>>>> allows the
>>>>>>>>>>>>>> framework to discover my Pojo and the annotations associated 
>>>>>>>>>>>>>> with it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think I can make it work now. I found a utility method
>>>>>>>>>>>>>>>>> for building my coder from class
>>>>>>>>>>>>>>>>> Something like
>>>>>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>>>>>>>>>>>>>> Coder<Data> dataCoder =
>>>>>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>>>>>>>>>>>>>                 TypeDescriptor.of(dataClass),
>>>>>>>>>>>>>>>>>                 schemaRegistry.getToRowFunction(dataClass),
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass));
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This will work. Though, did annotating the POJO like I said
>>>>>>>>>>>>>>>> not work?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  No, annotation alone does not work since I am not using
>>>>>>>>>>>>>>> concrete classes in the code where the pipeline is being 
>>>>>>>>>>>>>>> constructed.
>>>>>>>>>>>>>>> <Data> above is a template variable in the class which is 
>>>>>>>>>>>>>>> constructing the
>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> removing setCoder call breaks my pipeline.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> No Coder has been manually specified;  you may do so
>>>>>>>>>>>>>>>>>> using .setCoder().
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>   Inferring a Coder from the CoderRegistry failed:
>>>>>>>>>>>>>>>>>> Unable to provide a Coder for Data.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>   Building a Coder using a registered CoderProvider
>>>>>>>>>>>>>>>>>> failed.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Reason being the code which is building the pipeline is
>>>>>>>>>>>>>>>>>> based on Java Generics. Actual pipeline building code sets a 
>>>>>>>>>>>>>>>>>> bunch of
>>>>>>>>>>>>>>>>>> parameters which are used to construct the pipeline.
>>>>>>>>>>>>>>>>>> PCollection<Data> stream =
>>>>>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>>>>>>>>>>>>>> So I guess I will need to provide some more information
>>>>>>>>>>>>>>>>>> to the framework to make the annotation work.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If you annotate your POJO
>>>>>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will 
>>>>>>>>>>>>>>>>>>> usually automatically
>>>>>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder 
>>>>>>>>>>>>>>>>>>> call).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding.
>>>>>>>>>>>>>>>>>>>> In my current code I am doing something like this
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> PCollection<Data> =
>>>>>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder.
>>>>>>>>>>>>>>>>>>>>> Can you use Beam's schema encoding instead?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for
>>>>>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests if 
>>>>>>>>>>>>>>>>>>>>>> incompatible changes
>>>>>>>>>>>>>>>>>>>>>> are made to Pojo.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <
>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO
>>>>>>>>>>>>>>>>>>>>>>> before and after the change to ensure that they are 
>>>>>>>>>>>>>>>>>>>>>>> compatible as you
>>>>>>>>>>>>>>>>>>>>>>> expect.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but
>>>>>>>>>>>>>>>>>>>>>>>> asking here in hopes someone has faced a similar 
>>>>>>>>>>>>>>>>>>>>>>>> problem and can help.
>>>>>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a
>>>>>>>>>>>>>>>>>>>>>>>> running Dataflow job. I am noticing that compatibility 
>>>>>>>>>>>>>>>>>>>>>>>> checks fail any time
>>>>>>>>>>>>>>>>>>>>>>>> I add a new field to my data model. Error says
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to 
>>>>>>>>>>>>>>>>>>>>>>>> serialize the model. I read somewhere that adding new 
>>>>>>>>>>>>>>>>>>>>>>>> optional fields to the data should work when updating 
>>>>>>>>>>>>>>>>>>>>>>>> the pipeline.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of 
>>>>>>>>>>>>>>>>>>>>>>>> the model to something which allows me to update the 
>>>>>>>>>>>>>>>>>>>>>>>> pipeline in cases when I add new optional fields to 
>>>>>>>>>>>>>>>>>>>>>>>> existing model. Any suggestions?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>

Reply via email to