Also when can I expect that annotation to be available for use, I assume
that if that PR is accepted then 2.37.0 will be where I will be able to use
that right?

On Tue, Jan 11, 2022 at 3:58 PM gaurav mishra <[email protected]>
wrote:

> That's a valid point. I can live with the current solution for now. And
> hope that Dataflow will add that support soon.
>
> Another question regarding JavaBeanSchema implementation, I noticed we are
> using clazz.getDeclaredMethods() for preparing the schema. This ignores the
> base class's getters, is there a reason we are not using clazz.getMethods()
> for preparing the schema.
>
> On Tue, Jan 11, 2022 at 3:24 PM Reuven Lax <[email protected]> wrote:
>
>> What matters is the physical order of the fields in the schema encoding.
>> If you add gaps, you'll be fine initially (since the code I added simply
>> sorts by the FieldNumber), however you'll have problems on update.
>>
>> For example, let's say you have the following
>>
>>    SchemaFieldNumber("0") String fieldOne;
>>    SchemaFieldNumber("50") String fieldTwo;
>>
>> If you remove the check, things will appear just fine. However if you
>> then add a new field number in that gap, you'll have problems:
>>
>>    SchemaFieldNumber("0") String fieldOne;
>>    SchemaFieldNumber("1") String fieldThree;
>>    SchemaFieldNumber("50") String fieldTwo;
>>
>> Now fieldThree will be expected to be the second field encoded, and you
>> won't be able to update.
>>
>> Dataflow handles schema updates by keeping track of the full map of field
>> name -> encoding position from the old job and telling the coders of the
>> new job to maintain the old encoding positions; this forces all new fields
>> to be added to the end of the encoding. I think the long-term solution here
>> is for Dataflow to support this on state variables as well.
>>
>> Reuven
>>
>>
>> On Tue, Jan 11, 2022 at 1:55 PM gaurav mishra <
>> [email protected]> wrote:
>>
>>> Hi Reuven,
>>> In your patch I just dropped the check for gaps in the numbers and
>>> introduced gaps in the annotated field numbers in my beans. Generated
>>> schema seems to be consistent and still works across pipeline updates.
>>>
>>> Preconditions.checkState(
>>>              number == i,
>>>              "Expected field number "
>>>                  + i
>>>                  + " for field: "
>>>                  + type.getName()
>>>                  + " instead got "
>>>                  + number);
>>>
>>> Is there any reason why we are checking for gaps there?
>>>
>>> On Mon, Jan 10, 2022 at 5:37 PM gaurav mishra <
>>> [email protected]> wrote:
>>>
>>>>
>>>>
>>>> On Mon, Jan 10, 2022 at 11:42 AM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> Gaurav,
>>>>>
>>>>> Can you try this out and tell me if it works for you?
>>>>> https://github.com/apache/beam/pull/16472
>>>>>
>>>>> This allows you to annotate all your getters with SchemaFieldNumber,
>>>>> to specify the order of fields.
>>>>>
>>>> Hi Reuven
>>>> I am able to build and try out your code and it seems to be working
>>>> fine so far. I will continue to do more testing with this build. But I do
>>>> see a small issue with the current implementation. In the documentation it
>>>> says there are no gaps allowed in those numbers which makes it hard to use
>>>> this on classes which inherit from another class which uses this
>>>> annotation.
>>>> In my case I have an input Bean and an Output Bean that extends the
>>>> Input bean. I have to configure Coders for both these Beans. Input Bean
>>>> today uses number 0-40, and output Bean uses 41-46. If tomorrow I add a new
>>>> field to the input bean then I will have to use 47 on that field. to
>>>> avoid duplicates in output Bean. The only solution around this problem I
>>>> can think of is to allow for gaps in the number.
>>>>
>>>>
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Mon, Jan 10, 2022 at 9:28 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Yes, timers will get carried over.
>>>>>>
>>>>>> On Mon, Jan 10, 2022 at 9:19 AM gaurav mishra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> That DelegateCoder seems like a good idea to keep the DoFn code
>>>>>>> clean. In the short term I will perhaps use json string as my 
>>>>>>> intermediary
>>>>>>> and will let Jackson do the serialization and deserialization. from 
>>>>>>> what I
>>>>>>> have learned so far is that Dataflow does some magic in the background 
>>>>>>> to
>>>>>>> re-order fields to make the PCollection schema compatible to the 
>>>>>>> previous
>>>>>>> job (if I use SchemaCoder), So I don't have to make a switch from Java
>>>>>>> beans yet. I can use the DelegateCoder for my statespec and continue
>>>>>>> using SchemaCoder for PCollections.
>>>>>>>
>>>>>>> @Reuven
>>>>>>> Regarding timers in stateful DoFn, do they get carried over to the
>>>>>>> new job in Dataflow?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Gaurav,
>>>>>>>>
>>>>>>>> It looks like the issues related to state evolution is something
>>>>>>>> quite recurrent in this mailing list, and do not seem to be covered in 
>>>>>>>> the
>>>>>>>> Beam docs, unfortunately.
>>>>>>>>
>>>>>>>> I am not familiar with Google Dataflow runner, but the way I
>>>>>>>> achieved state evolution on Flink runner was a combo of:
>>>>>>>> 1 - version-controlled Avro schemas for generating POJOs (models)
>>>>>>>> which were used in stateful Beam DoFns
>>>>>>>> 2 - a generic Avro-generated POJO as an intermediary - ( schema:
>>>>>>>> string, payload: bytes )
>>>>>>>> 3 -
>>>>>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html
>>>>>>>> responsible for converting a model into an intermediary (2)
>>>>>>>>
>>>>>>>> This allowed adding & dropping nullable fields and to do other
>>>>>>>> Avro-forward compatible changes without loosing the state. The order of
>>>>>>>> fields also didn't matter in this setting. And this also meant double 
>>>>>>>> ser-
>>>>>>>> and deserialization, unfortunately. But I wasn't able to come to any 
>>>>>>>> better
>>>>>>>> solution, given that there seemed to be no public info available about 
>>>>>>>> the
>>>>>>>> topic, or, maybe, due to the lack of my experience.
>>>>>>>>
>>>>>>>> Relevant mail threads:
>>>>>>>> https://www.mail-archive.com/[email protected]/msg07249.html
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Pavel Solomin
>>>>>>>>
>>>>>>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype:
>>>>>>>> pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 10 Jan 2022 at 08:43, gaurav mishra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Unfortunately, as you've discovered, Dataflow's update only pays
>>>>>>>>>> attention to actual PCollections, and does not look at DoFn state 
>>>>>>>>>> variables
>>>>>>>>>> at all (historically, stateful DoFns did not yet exist when Dataflow
>>>>>>>>>> implemented its update support).
>>>>>>>>>>
>>>>>>>>>> You can file a feature request against Google to implement update
>>>>>>>>>> on state variables, as this is honestly a major feature gap. In the
>>>>>>>>>> meantime there might be some workarounds you can use until this 
>>>>>>>>>> support is
>>>>>>>>>> added. e.g.
>>>>>>>>>>    - you could convert the data into a protocol buffer before
>>>>>>>>>> storing into state; ProtoCoder should handle new fields, and fields 
>>>>>>>>>> in a
>>>>>>>>>> protocol buffer have a deterministic order.
>>>>>>>>>>
>>>>>>>>> Is there an easy way to go from a java object to a protocol buffer
>>>>>>>>> object. The only way I can think of is to first serialize the java 
>>>>>>>>> model to
>>>>>>>>> json and then deserialize the json into a proto buffer object.  Same 
>>>>>>>>> for
>>>>>>>>> the other way around - protobuf to java object.  while I was writing 
>>>>>>>>> this I
>>>>>>>>> realized I can maybe store the json dump of my java object in state 
>>>>>>>>> spec.
>>>>>>>>> That should also work.
>>>>>>>>>
>>>>>>>>>>    - you could explicitly use the Row object, although this will
>>>>>>>>>> again make your code more complex.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Another intermediate option - we could fairly easily add
>>>>>>>>>> a @SchemaFieldId annotation to Beam that would allow you to specify 
>>>>>>>>>> the
>>>>>>>>>> order of fields. You would have to annotate every getter, something 
>>>>>>>>>> like
>>>>>>>>>> this:
>>>>>>>>>>
>>>>>>>>>> @ScheamFieldId(0)
>>>>>>>>>> public T getV1();
>>>>>>>>>>
>>>>>>>>>> @SchemaFieldId(1)
>>>>>>>>>> public T getV2();
>>>>>>>>>>
>>>>>>>>> Yes please. If this is an easy change and If you can point me in
>>>>>>>>> the right direction I will be happy to submit a patch for this.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> etc.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> SerializableCoder can be tricky for updates, and can also be
>>>>>>>>>>> very inefficient.
>>>>>>>>>>>
>>>>>>>>>>> As you noticed, when inferring schemas from a Java class, the
>>>>>>>>>>> order of fields is non deterministic (not much that we can do about 
>>>>>>>>>>> this,
>>>>>>>>>>> as Java doesn't return the fields in a deterministic order). For 
>>>>>>>>>>> regular
>>>>>>>>>>> PCollections, Dataflow (though currently only Dataflow) will allow 
>>>>>>>>>>> update
>>>>>>>>>>> even though the field orders have changed. Unfortunately
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I now have SchemaCoder for all data flowing through
>>>>>>>>>>>> the pipeline.
>>>>>>>>>>>> I am noticing another issue once a new pipeline is started with
>>>>>>>>>>>> --update flag. In my stateful DoFn where I am doing 
>>>>>>>>>>>> prevState.read(),
>>>>>>>>>>>> exceptions are being thrown. errors like
>>>>>>>>>>>> Caused by: java.io.EOFException: reached end of stream after
>>>>>>>>>>>> reading 68 bytes; 101 bytes expected
>>>>>>>>>>>> In this case there was no change in code or data model when
>>>>>>>>>>>> relaunching the pipeline.
>>>>>>>>>>>>
>>>>>>>>>>>> It seems that Schema for my Bean is changing on every run of
>>>>>>>>>>>> the pipeline. I manually inspected the Schema returned by the
>>>>>>>>>>>> SchemaRegistry by running the pipeline a few times locally and 
>>>>>>>>>>>> each time I
>>>>>>>>>>>> noticed the order of fields in the Schema was different. This 
>>>>>>>>>>>> changing of
>>>>>>>>>>>> order of fields is breaking the new pipeline. Is there a way to 
>>>>>>>>>>>> enforce
>>>>>>>>>>>> some kind of ordering on the fields?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Looking at source code for the JavaBeamSchema it seems I will
>>>>>>>>>>>>> have to annotate my getter and setters with any flavour of 
>>>>>>>>>>>>> @Nullable.
>>>>>>>>>>>>> Another question -
>>>>>>>>>>>>> does the dataflow update pipeline feature work only with
>>>>>>>>>>>>> SchemaCoder? I have SerializableCoder and ProtoCoder being used 
>>>>>>>>>>>>> in some of
>>>>>>>>>>>>> my other pipelines. I wonder if I have to change those too to 
>>>>>>>>>>>>> open doors
>>>>>>>>>>>>> for updating pipelines there?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Some progress made.
>>>>>>>>>>>>>> Now I can see that schema has all the needed fields. But all
>>>>>>>>>>>>>> fields in the returned schema are marked as "NOT NULL". even 
>>>>>>>>>>>>>> though the
>>>>>>>>>>>>>> fields are annotated with @javax.annotation.Nullable.
>>>>>>>>>>>>>> Is there a different flavor of @Nullable I need to use on my
>>>>>>>>>>>>>> fields?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ah, this isn't a POJO then. In this case, try using
>>>>>>>>>>>>>>> DefaultSchema(JavaBeanSchema.class)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It looks something like this. It currently has around 40
>>>>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>>>>>>>>>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>>>>>>> @EqualsAndHashCode
>>>>>>>>>>>>>>>> @ToString
>>>>>>>>>>>>>>>> public class POJO implements SomeInterface, Serializable {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     public static final Integer SOME_CONSTANT_FIELD = 2;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>>>>>     private String field1;
>>>>>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>>>>>     private String field2;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     @Nullable
>>>>>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>>>>>     private Integer field3;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     .....
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>>>>>     public String getField1() {
>>>>>>>>>>>>>>>>         return field1;
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>>>>>     public void setField1(String field1) {
>>>>>>>>>>>>>>>>         this.field1 = field1;
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>>>>>     public String getField2() {
>>>>>>>>>>>>>>>>         return field2;
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>>>>>     public void setField2(String field2) {
>>>>>>>>>>>>>>>>         this.field2 = field2;
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>>>>>     public Integer getField3() {
>>>>>>>>>>>>>>>>         return field3;
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>>>>>     public void setField3(Integer field3) {
>>>>>>>>>>>>>>>>         this.field3 = field3;
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     public POJO() {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>          0 arg empty constructor
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>     public POJO(POJO other) {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>        ... copy constructor
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Can you paste the code for your Pojo?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a
>>>>>>>>>>>>>>>>>> empty schema.
>>>>>>>>>>>>>>>>>> my Pojo is annotated with
>>>>>>>>>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>>>>>>>>> Is there something extra I need to do here to register my
>>>>>>>>>>>>>>>>>> class with a schema registry.
>>>>>>>>>>>>>>>>>> Note: the code which is building the pipeline is sitting
>>>>>>>>>>>>>>>>>> in a library (Different package) which is being imported 
>>>>>>>>>>>>>>>>>> into my pipeline
>>>>>>>>>>>>>>>>>> code. So perhaps there is some configuration which is 
>>>>>>>>>>>>>>>>>> missing which allows
>>>>>>>>>>>>>>>>>> the framework to discover my Pojo and the annotations 
>>>>>>>>>>>>>>>>>> associated with it.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I think I can make it work now. I found a utility
>>>>>>>>>>>>>>>>>>>>> method for building my coder from class
>>>>>>>>>>>>>>>>>>>>> Something like
>>>>>>>>>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>>>>>>>>>>>>>>>>>> Coder<Data> dataCoder =
>>>>>>>>>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>>>>>>>>>>>>>>>>>                 TypeDescriptor.of(dataClass),
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass),
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass));
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> This will work. Though, did annotating the POJO like I
>>>>>>>>>>>>>>>>>>>> said not work?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>  No, annotation alone does not work since I am not using
>>>>>>>>>>>>>>>>>>> concrete classes in the code where the pipeline is being 
>>>>>>>>>>>>>>>>>>> constructed.
>>>>>>>>>>>>>>>>>>> <Data> above is a template variable in the class which is 
>>>>>>>>>>>>>>>>>>> constructing the
>>>>>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> removing setCoder call breaks my pipeline.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> No Coder has been manually specified;  you may do so
>>>>>>>>>>>>>>>>>>>>>> using .setCoder().
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>   Inferring a Coder from the CoderRegistry failed:
>>>>>>>>>>>>>>>>>>>>>> Unable to provide a Coder for Data.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>   Building a Coder using a registered CoderProvider
>>>>>>>>>>>>>>>>>>>>>> failed.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Reason being the code which is building the pipeline
>>>>>>>>>>>>>>>>>>>>>> is based on Java Generics. Actual pipeline building code 
>>>>>>>>>>>>>>>>>>>>>> sets a bunch of
>>>>>>>>>>>>>>>>>>>>>> parameters which are used to construct the pipeline.
>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> stream =
>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>>>>>>>>>>>>>>>>>> So I guess I will need to provide some more
>>>>>>>>>>>>>>>>>>>>>> information to the framework to make the annotation work.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> If you annotate your POJO
>>>>>>>>>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will 
>>>>>>>>>>>>>>>>>>>>>>> usually automatically
>>>>>>>>>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the 
>>>>>>>>>>>>>>>>>>>>>>> setCoder call).
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema
>>>>>>>>>>>>>>>>>>>>>>>> encoding.
>>>>>>>>>>>>>>>>>>>>>>>> In my current code I am doing something like this
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> =
>>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro
>>>>>>>>>>>>>>>>>>>>>>>>> coder. Can you use Beam's schema encoding instead?
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for
>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests if 
>>>>>>>>>>>>>>>>>>>>>>>>>> incompatible changes
>>>>>>>>>>>>>>>>>>>>>>>>>> are made to Pojo.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <
>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the
>>>>>>>>>>>>>>>>>>>>>>>>>>> POJO before and after the change to ensure that 
>>>>>>>>>>>>>>>>>>>>>>>>>>> they are compatible as you
>>>>>>>>>>>>>>>>>>>>>>>>>>> expect.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but
>>>>>>>>>>>>>>>>>>>>>>>>>>>> asking here in hopes someone has faced a similar 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem and can help.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update
>>>>>>>>>>>>>>>>>>>>>>>>>>>> a running Dataflow job. I am noticing that 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility checks fail any
>>>>>>>>>>>>>>>>>>>>>>>>>>>> time I add a new field to my data model. Error says
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> serialize the model. I read somewhere that adding 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> new optional fields to the data should work when 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> updating the pipeline.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of the model to something which 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> allows me to update the pipeline in cases when I 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> add new optional fields to existing model. Any 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Reply via email to