Also when can I expect that annotation to be available for use, I assume that if that PR is accepted then 2.37.0 will be where I will be able to use that right?
On Tue, Jan 11, 2022 at 3:58 PM gaurav mishra <[email protected]> wrote: > That's a valid point. I can live with the current solution for now. And > hope that Dataflow will add that support soon. > > Another question regarding JavaBeanSchema implementation, I noticed we are > using clazz.getDeclaredMethods() for preparing the schema. This ignores the > base class's getters, is there a reason we are not using clazz.getMethods() > for preparing the schema. > > On Tue, Jan 11, 2022 at 3:24 PM Reuven Lax <[email protected]> wrote: > >> What matters is the physical order of the fields in the schema encoding. >> If you add gaps, you'll be fine initially (since the code I added simply >> sorts by the FieldNumber), however you'll have problems on update. >> >> For example, let's say you have the following >> >> SchemaFieldNumber("0") String fieldOne; >> SchemaFieldNumber("50") String fieldTwo; >> >> If you remove the check, things will appear just fine. However if you >> then add a new field number in that gap, you'll have problems: >> >> SchemaFieldNumber("0") String fieldOne; >> SchemaFieldNumber("1") String fieldThree; >> SchemaFieldNumber("50") String fieldTwo; >> >> Now fieldThree will be expected to be the second field encoded, and you >> won't be able to update. >> >> Dataflow handles schema updates by keeping track of the full map of field >> name -> encoding position from the old job and telling the coders of the >> new job to maintain the old encoding positions; this forces all new fields >> to be added to the end of the encoding. I think the long-term solution here >> is for Dataflow to support this on state variables as well. >> >> Reuven >> >> >> On Tue, Jan 11, 2022 at 1:55 PM gaurav mishra < >> [email protected]> wrote: >> >>> Hi Reuven, >>> In your patch I just dropped the check for gaps in the numbers and >>> introduced gaps in the annotated field numbers in my beans. Generated >>> schema seems to be consistent and still works across pipeline updates. >>> >>> Preconditions.checkState( >>> number == i, >>> "Expected field number " >>> + i >>> + " for field: " >>> + type.getName() >>> + " instead got " >>> + number); >>> >>> Is there any reason why we are checking for gaps there? >>> >>> On Mon, Jan 10, 2022 at 5:37 PM gaurav mishra < >>> [email protected]> wrote: >>> >>>> >>>> >>>> On Mon, Jan 10, 2022 at 11:42 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> Gaurav, >>>>> >>>>> Can you try this out and tell me if it works for you? >>>>> https://github.com/apache/beam/pull/16472 >>>>> >>>>> This allows you to annotate all your getters with SchemaFieldNumber, >>>>> to specify the order of fields. >>>>> >>>> Hi Reuven >>>> I am able to build and try out your code and it seems to be working >>>> fine so far. I will continue to do more testing with this build. But I do >>>> see a small issue with the current implementation. In the documentation it >>>> says there are no gaps allowed in those numbers which makes it hard to use >>>> this on classes which inherit from another class which uses this >>>> annotation. >>>> In my case I have an input Bean and an Output Bean that extends the >>>> Input bean. I have to configure Coders for both these Beans. Input Bean >>>> today uses number 0-40, and output Bean uses 41-46. If tomorrow I add a new >>>> field to the input bean then I will have to use 47 on that field. to >>>> avoid duplicates in output Bean. The only solution around this problem I >>>> can think of is to allow for gaps in the number. >>>> >>>> >>>>> >>>>> Reuven >>>>> >>>>> On Mon, Jan 10, 2022 at 9:28 AM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> Yes, timers will get carried over. >>>>>> >>>>>> On Mon, Jan 10, 2022 at 9:19 AM gaurav mishra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> That DelegateCoder seems like a good idea to keep the DoFn code >>>>>>> clean. In the short term I will perhaps use json string as my >>>>>>> intermediary >>>>>>> and will let Jackson do the serialization and deserialization. from >>>>>>> what I >>>>>>> have learned so far is that Dataflow does some magic in the background >>>>>>> to >>>>>>> re-order fields to make the PCollection schema compatible to the >>>>>>> previous >>>>>>> job (if I use SchemaCoder), So I don't have to make a switch from Java >>>>>>> beans yet. I can use the DelegateCoder for my statespec and continue >>>>>>> using SchemaCoder for PCollections. >>>>>>> >>>>>>> @Reuven >>>>>>> Regarding timers in stateful DoFn, do they get carried over to the >>>>>>> new job in Dataflow? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello Gaurav, >>>>>>>> >>>>>>>> It looks like the issues related to state evolution is something >>>>>>>> quite recurrent in this mailing list, and do not seem to be covered in >>>>>>>> the >>>>>>>> Beam docs, unfortunately. >>>>>>>> >>>>>>>> I am not familiar with Google Dataflow runner, but the way I >>>>>>>> achieved state evolution on Flink runner was a combo of: >>>>>>>> 1 - version-controlled Avro schemas for generating POJOs (models) >>>>>>>> which were used in stateful Beam DoFns >>>>>>>> 2 - a generic Avro-generated POJO as an intermediary - ( schema: >>>>>>>> string, payload: bytes ) >>>>>>>> 3 - >>>>>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html >>>>>>>> responsible for converting a model into an intermediary (2) >>>>>>>> >>>>>>>> This allowed adding & dropping nullable fields and to do other >>>>>>>> Avro-forward compatible changes without loosing the state. The order of >>>>>>>> fields also didn't matter in this setting. And this also meant double >>>>>>>> ser- >>>>>>>> and deserialization, unfortunately. But I wasn't able to come to any >>>>>>>> better >>>>>>>> solution, given that there seemed to be no public info available about >>>>>>>> the >>>>>>>> topic, or, maybe, due to the lack of my experience. >>>>>>>> >>>>>>>> Relevant mail threads: >>>>>>>> https://www.mail-archive.com/[email protected]/msg07249.html >>>>>>>> >>>>>>>> Best Regards, >>>>>>>> Pavel Solomin >>>>>>>> >>>>>>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: >>>>>>>> pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, 10 Jan 2022 at 08:43, gaurav mishra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Unfortunately, as you've discovered, Dataflow's update only pays >>>>>>>>>> attention to actual PCollections, and does not look at DoFn state >>>>>>>>>> variables >>>>>>>>>> at all (historically, stateful DoFns did not yet exist when Dataflow >>>>>>>>>> implemented its update support). >>>>>>>>>> >>>>>>>>>> You can file a feature request against Google to implement update >>>>>>>>>> on state variables, as this is honestly a major feature gap. In the >>>>>>>>>> meantime there might be some workarounds you can use until this >>>>>>>>>> support is >>>>>>>>>> added. e.g. >>>>>>>>>> - you could convert the data into a protocol buffer before >>>>>>>>>> storing into state; ProtoCoder should handle new fields, and fields >>>>>>>>>> in a >>>>>>>>>> protocol buffer have a deterministic order. >>>>>>>>>> >>>>>>>>> Is there an easy way to go from a java object to a protocol buffer >>>>>>>>> object. The only way I can think of is to first serialize the java >>>>>>>>> model to >>>>>>>>> json and then deserialize the json into a proto buffer object. Same >>>>>>>>> for >>>>>>>>> the other way around - protobuf to java object. while I was writing >>>>>>>>> this I >>>>>>>>> realized I can maybe store the json dump of my java object in state >>>>>>>>> spec. >>>>>>>>> That should also work. >>>>>>>>> >>>>>>>>>> - you could explicitly use the Row object, although this will >>>>>>>>>> again make your code more complex. >>>>>>>>>> >>>>>>>>> >>>>>>>>>> Another intermediate option - we could fairly easily add >>>>>>>>>> a @SchemaFieldId annotation to Beam that would allow you to specify >>>>>>>>>> the >>>>>>>>>> order of fields. You would have to annotate every getter, something >>>>>>>>>> like >>>>>>>>>> this: >>>>>>>>>> >>>>>>>>>> @ScheamFieldId(0) >>>>>>>>>> public T getV1(); >>>>>>>>>> >>>>>>>>>> @SchemaFieldId(1) >>>>>>>>>> public T getV2(); >>>>>>>>>> >>>>>>>>> Yes please. If this is an easy change and If you can point me in >>>>>>>>> the right direction I will be happy to submit a patch for this. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> etc. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> SerializableCoder can be tricky for updates, and can also be >>>>>>>>>>> very inefficient. >>>>>>>>>>> >>>>>>>>>>> As you noticed, when inferring schemas from a Java class, the >>>>>>>>>>> order of fields is non deterministic (not much that we can do about >>>>>>>>>>> this, >>>>>>>>>>> as Java doesn't return the fields in a deterministic order). For >>>>>>>>>>> regular >>>>>>>>>>> PCollections, Dataflow (though currently only Dataflow) will allow >>>>>>>>>>> update >>>>>>>>>>> even though the field orders have changed. Unfortunately >>>>>>>>>>> >>>>>>>>>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> I now have SchemaCoder for all data flowing through >>>>>>>>>>>> the pipeline. >>>>>>>>>>>> I am noticing another issue once a new pipeline is started with >>>>>>>>>>>> --update flag. In my stateful DoFn where I am doing >>>>>>>>>>>> prevState.read(), >>>>>>>>>>>> exceptions are being thrown. errors like >>>>>>>>>>>> Caused by: java.io.EOFException: reached end of stream after >>>>>>>>>>>> reading 68 bytes; 101 bytes expected >>>>>>>>>>>> In this case there was no change in code or data model when >>>>>>>>>>>> relaunching the pipeline. >>>>>>>>>>>> >>>>>>>>>>>> It seems that Schema for my Bean is changing on every run of >>>>>>>>>>>> the pipeline. I manually inspected the Schema returned by the >>>>>>>>>>>> SchemaRegistry by running the pipeline a few times locally and >>>>>>>>>>>> each time I >>>>>>>>>>>> noticed the order of fields in the Schema was different. This >>>>>>>>>>>> changing of >>>>>>>>>>>> order of fields is breaking the new pipeline. Is there a way to >>>>>>>>>>>> enforce >>>>>>>>>>>> some kind of ordering on the fields? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Looking at source code for the JavaBeamSchema it seems I will >>>>>>>>>>>>> have to annotate my getter and setters with any flavour of >>>>>>>>>>>>> @Nullable. >>>>>>>>>>>>> Another question - >>>>>>>>>>>>> does the dataflow update pipeline feature work only with >>>>>>>>>>>>> SchemaCoder? I have SerializableCoder and ProtoCoder being used >>>>>>>>>>>>> in some of >>>>>>>>>>>>> my other pipelines. I wonder if I have to change those too to >>>>>>>>>>>>> open doors >>>>>>>>>>>>> for updating pipelines there? >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Some progress made. >>>>>>>>>>>>>> Now I can see that schema has all the needed fields. But all >>>>>>>>>>>>>> fields in the returned schema are marked as "NOT NULL". even >>>>>>>>>>>>>> though the >>>>>>>>>>>>>> fields are annotated with @javax.annotation.Nullable. >>>>>>>>>>>>>> Is there a different flavor of @Nullable I need to use on my >>>>>>>>>>>>>> fields? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ah, this isn't a POJO then. In this case, try using >>>>>>>>>>>>>>> DefaultSchema(JavaBeanSchema.class) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It looks something like this. It currently has around 40 >>>>>>>>>>>>>>>> fields. >>>>>>>>>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true) >>>>>>>>>>>>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>>>>>>>>>>>> @EqualsAndHashCode >>>>>>>>>>>>>>>> @ToString >>>>>>>>>>>>>>>> public class POJO implements SomeInterface, Serializable { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public static final Integer SOME_CONSTANT_FIELD = 2; >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>>>>>> private String field1; >>>>>>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>>>>>> private String field2; >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Nullable >>>>>>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>>>>>> private Integer field3; >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ..... >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>>>>>> public String getField1() { >>>>>>>>>>>>>>>> return field1; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>>>>>> public void setField1(String field1) { >>>>>>>>>>>>>>>> this.field1 = field1; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>>>>>> public String getField2() { >>>>>>>>>>>>>>>> return field2; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>>>>>> public void setField2(String field2) { >>>>>>>>>>>>>>>> this.field2 = field2; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>>>>>> public Integer getField3() { >>>>>>>>>>>>>>>> return field3; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>>>>>> public void setField3(Integer field3) { >>>>>>>>>>>>>>>> this.field3 = field3; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public POJO() { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 0 arg empty constructor >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> public POJO(POJO other) { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ... copy constructor >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Can you paste the code for your Pojo? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a >>>>>>>>>>>>>>>>>> empty schema. >>>>>>>>>>>>>>>>>> my Pojo is annotated with >>>>>>>>>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class) >>>>>>>>>>>>>>>>>> Is there something extra I need to do here to register my >>>>>>>>>>>>>>>>>> class with a schema registry. >>>>>>>>>>>>>>>>>> Note: the code which is building the pipeline is sitting >>>>>>>>>>>>>>>>>> in a library (Different package) which is being imported >>>>>>>>>>>>>>>>>> into my pipeline >>>>>>>>>>>>>>>>>> code. So perhaps there is some configuration which is >>>>>>>>>>>>>>>>>> missing which allows >>>>>>>>>>>>>>>>>> the framework to discover my Pojo and the annotations >>>>>>>>>>>>>>>>>> associated with it. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I think I can make it work now. I found a utility >>>>>>>>>>>>>>>>>>>>> method for building my coder from class >>>>>>>>>>>>>>>>>>>>> Something like >>>>>>>>>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass(); >>>>>>>>>>>>>>>>>>>>> Coder<Data> dataCoder = >>>>>>>>>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass), >>>>>>>>>>>>>>>>>>>>> TypeDescriptor.of(dataClass), >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass), >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass)); >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This will work. Though, did annotating the POJO like I >>>>>>>>>>>>>>>>>>>> said not work? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> No, annotation alone does not work since I am not using >>>>>>>>>>>>>>>>>>> concrete classes in the code where the pipeline is being >>>>>>>>>>>>>>>>>>> constructed. >>>>>>>>>>>>>>>>>>> <Data> above is a template variable in the class which is >>>>>>>>>>>>>>>>>>> constructing the >>>>>>>>>>>>>>>>>>> pipeline. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> removing setCoder call breaks my pipeline. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> No Coder has been manually specified; you may do so >>>>>>>>>>>>>>>>>>>>>> using .setCoder(). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Inferring a Coder from the CoderRegistry failed: >>>>>>>>>>>>>>>>>>>>>> Unable to provide a Coder for Data. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Building a Coder using a registered CoderProvider >>>>>>>>>>>>>>>>>>>>>> failed. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Reason being the code which is building the pipeline >>>>>>>>>>>>>>>>>>>>>> is based on Java Generics. Actual pipeline building code >>>>>>>>>>>>>>>>>>>>>> sets a bunch of >>>>>>>>>>>>>>>>>>>>>> parameters which are used to construct the pipeline. >>>>>>>>>>>>>>>>>>>>>> PCollection<Data> stream = >>>>>>>>>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder) >>>>>>>>>>>>>>>>>>>>>> So I guess I will need to provide some more >>>>>>>>>>>>>>>>>>>>>> information to the framework to make the annotation work. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax < >>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> If you annotate your POJO >>>>>>>>>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will >>>>>>>>>>>>>>>>>>>>>>> usually automatically >>>>>>>>>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the >>>>>>>>>>>>>>>>>>>>>>> setCoder call). >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema >>>>>>>>>>>>>>>>>>>>>>>> encoding. >>>>>>>>>>>>>>>>>>>>>>>> In my current code I am doing something like this >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> = >>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class)) >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax < >>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro >>>>>>>>>>>>>>>>>>>>>>>>> coder. Can you use Beam's schema encoding instead? >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for >>>>>>>>>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests if >>>>>>>>>>>>>>>>>>>>>>>>>> incompatible changes >>>>>>>>>>>>>>>>>>>>>>>>>> are made to Pojo. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik < >>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the >>>>>>>>>>>>>>>>>>>>>>>>>>> POJO before and after the change to ensure that >>>>>>>>>>>>>>>>>>>>>>>>>>> they are compatible as you >>>>>>>>>>>>>>>>>>>>>>>>>>> expect. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but >>>>>>>>>>>>>>>>>>>>>>>>>>>> asking here in hopes someone has faced a similar >>>>>>>>>>>>>>>>>>>>>>>>>>>> problem and can help. >>>>>>>>>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update >>>>>>>>>>>>>>>>>>>>>>>>>>>> a running Dataflow job. I am noticing that >>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility checks fail any >>>>>>>>>>>>>>>>>>>>>>>>>>>> time I add a new field to my data model. Error says >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ has changed >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data. Avro coder to >>>>>>>>>>>>>>>>>>>>>>>>>>>> serialize the model. I read somewhere that adding >>>>>>>>>>>>>>>>>>>>>>>>>>>> new optional fields to the data should work when >>>>>>>>>>>>>>>>>>>>>>>>>>>> updating the pipeline. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or >>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of the model to something which >>>>>>>>>>>>>>>>>>>>>>>>>>>> allows me to update the pipeline in cases when I >>>>>>>>>>>>>>>>>>>>>>>>>>>> add new optional fields to existing model. Any >>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
