On Tue, Jan 11, 2022 at 4:04 PM Reuven Lax <[email protected]> wrote: > Probably a bug - JavaFieldSchema does walk up the inheritance tree (see > ReflectUtils.getFields) > Should I open a Jira for this?
> > Do you have a github account? If so I can add you as a reviewer on this PR. > > On Tue, Jan 11, 2022 at 3:58 PM gaurav mishra < > [email protected]> wrote: > >> That's a valid point. I can live with the current solution for now. And >> hope that Dataflow will add that support soon. >> >> Another question regarding JavaBeanSchema implementation, I noticed we >> are using clazz.getDeclaredMethods() for preparing the schema. This ignores >> the base class's getters, is there a reason we are not using >> clazz.getMethods() for preparing the schema. >> >> On Tue, Jan 11, 2022 at 3:24 PM Reuven Lax <[email protected]> wrote: >> >>> What matters is the physical order of the fields in the schema encoding. >>> If you add gaps, you'll be fine initially (since the code I added simply >>> sorts by the FieldNumber), however you'll have problems on update. >>> >>> For example, let's say you have the following >>> >>> SchemaFieldNumber("0") String fieldOne; >>> SchemaFieldNumber("50") String fieldTwo; >>> >>> If you remove the check, things will appear just fine. However if you >>> then add a new field number in that gap, you'll have problems: >>> >>> SchemaFieldNumber("0") String fieldOne; >>> SchemaFieldNumber("1") String fieldThree; >>> SchemaFieldNumber("50") String fieldTwo; >>> >>> Now fieldThree will be expected to be the second field encoded, and you >>> won't be able to update. >>> >>> Dataflow handles schema updates by keeping track of the full map of >>> field name -> encoding position from the old job and telling the coders of >>> the new job to maintain the old encoding positions; this forces all new >>> fields to be added to the end of the encoding. I think the long-term >>> solution here is for Dataflow to support this on state variables as well. >>> >>> Reuven >>> >>> >>> On Tue, Jan 11, 2022 at 1:55 PM gaurav mishra < >>> [email protected]> wrote: >>> >>>> Hi Reuven, >>>> In your patch I just dropped the check for gaps in the numbers and >>>> introduced gaps in the annotated field numbers in my beans. Generated >>>> schema seems to be consistent and still works across pipeline updates. >>>> >>>> Preconditions.checkState( >>>> number == i, >>>> "Expected field number " >>>> + i >>>> + " for field: " >>>> + type.getName() >>>> + " instead got " >>>> + number); >>>> >>>> Is there any reason why we are checking for gaps there? >>>> >>>> On Mon, Jan 10, 2022 at 5:37 PM gaurav mishra < >>>> [email protected]> wrote: >>>> >>>>> >>>>> >>>>> On Mon, Jan 10, 2022 at 11:42 AM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> Gaurav, >>>>>> >>>>>> Can you try this out and tell me if it works for you? >>>>>> https://github.com/apache/beam/pull/16472 >>>>>> >>>>>> This allows you to annotate all your getters with SchemaFieldNumber, >>>>>> to specify the order of fields. >>>>>> >>>>> Hi Reuven >>>>> I am able to build and try out your code and it seems to be working >>>>> fine so far. I will continue to do more testing with this build. But I do >>>>> see a small issue with the current implementation. In the documentation it >>>>> says there are no gaps allowed in those numbers which makes it hard to use >>>>> this on classes which inherit from another class which uses this >>>>> annotation. >>>>> In my case I have an input Bean and an Output Bean that extends the >>>>> Input bean. I have to configure Coders for both these Beans. Input Bean >>>>> today uses number 0-40, and output Bean uses 41-46. If tomorrow I add a >>>>> new >>>>> field to the input bean then I will have to use 47 on that field. to >>>>> avoid duplicates in output Bean. The only solution around this problem I >>>>> can think of is to allow for gaps in the number. >>>>> >>>>> >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Mon, Jan 10, 2022 at 9:28 AM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> Yes, timers will get carried over. >>>>>>> >>>>>>> On Mon, Jan 10, 2022 at 9:19 AM gaurav mishra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> That DelegateCoder seems like a good idea to keep the DoFn code >>>>>>>> clean. In the short term I will perhaps use json string as my >>>>>>>> intermediary >>>>>>>> and will let Jackson do the serialization and deserialization. from >>>>>>>> what I >>>>>>>> have learned so far is that Dataflow does some magic in the background >>>>>>>> to >>>>>>>> re-order fields to make the PCollection schema compatible to the >>>>>>>> previous >>>>>>>> job (if I use SchemaCoder), So I don't have to make a switch from Java >>>>>>>> beans yet. I can use the DelegateCoder for my statespec and continue >>>>>>>> using SchemaCoder for PCollections. >>>>>>>> >>>>>>>> @Reuven >>>>>>>> Regarding timers in stateful DoFn, do they get carried over to the >>>>>>>> new job in Dataflow? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hello Gaurav, >>>>>>>>> >>>>>>>>> It looks like the issues related to state evolution is something >>>>>>>>> quite recurrent in this mailing list, and do not seem to be covered >>>>>>>>> in the >>>>>>>>> Beam docs, unfortunately. >>>>>>>>> >>>>>>>>> I am not familiar with Google Dataflow runner, but the way I >>>>>>>>> achieved state evolution on Flink runner was a combo of: >>>>>>>>> 1 - version-controlled Avro schemas for generating POJOs (models) >>>>>>>>> which were used in stateful Beam DoFns >>>>>>>>> 2 - a generic Avro-generated POJO as an intermediary - ( schema: >>>>>>>>> string, payload: bytes ) >>>>>>>>> 3 - >>>>>>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html >>>>>>>>> responsible for converting a model into an intermediary (2) >>>>>>>>> >>>>>>>>> This allowed adding & dropping nullable fields and to do other >>>>>>>>> Avro-forward compatible changes without loosing the state. The order >>>>>>>>> of >>>>>>>>> fields also didn't matter in this setting. And this also meant double >>>>>>>>> ser- >>>>>>>>> and deserialization, unfortunately. But I wasn't able to come to any >>>>>>>>> better >>>>>>>>> solution, given that there seemed to be no public info available >>>>>>>>> about the >>>>>>>>> topic, or, maybe, due to the lack of my experience. >>>>>>>>> >>>>>>>>> Relevant mail threads: >>>>>>>>> https://www.mail-archive.com/[email protected]/msg07249.html >>>>>>>>> >>>>>>>>> Best Regards, >>>>>>>>> Pavel Solomin >>>>>>>>> >>>>>>>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: >>>>>>>>> pavel_solomin | Linkedin >>>>>>>>> <https://www.linkedin.com/in/pavelsolomin> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, 10 Jan 2022 at 08:43, gaurav mishra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Unfortunately, as you've discovered, Dataflow's update only pays >>>>>>>>>>> attention to actual PCollections, and does not look at DoFn state >>>>>>>>>>> variables >>>>>>>>>>> at all (historically, stateful DoFns did not yet exist when Dataflow >>>>>>>>>>> implemented its update support). >>>>>>>>>>> >>>>>>>>>>> You can file a feature request against Google to implement >>>>>>>>>>> update on state variables, as this is honestly a major feature gap. >>>>>>>>>>> In the >>>>>>>>>>> meantime there might be some workarounds you can use until this >>>>>>>>>>> support is >>>>>>>>>>> added. e.g. >>>>>>>>>>> - you could convert the data into a protocol buffer before >>>>>>>>>>> storing into state; ProtoCoder should handle new fields, and fields >>>>>>>>>>> in a >>>>>>>>>>> protocol buffer have a deterministic order. >>>>>>>>>>> >>>>>>>>>> Is there an easy way to go from a java object to a protocol >>>>>>>>>> buffer object. The only way I can think of is to first serialize the >>>>>>>>>> java >>>>>>>>>> model to json and then deserialize the json into a proto buffer >>>>>>>>>> object. >>>>>>>>>> Same for the other way around - protobuf to java object. while I was >>>>>>>>>> writing this I realized I can maybe store the json dump of my java >>>>>>>>>> object >>>>>>>>>> in state spec. That should also work. >>>>>>>>>> >>>>>>>>>>> - you could explicitly use the Row object, although this will >>>>>>>>>>> again make your code more complex. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Another intermediate option - we could fairly easily add >>>>>>>>>>> a @SchemaFieldId annotation to Beam that would allow you to specify >>>>>>>>>>> the >>>>>>>>>>> order of fields. You would have to annotate every getter, something >>>>>>>>>>> like >>>>>>>>>>> this: >>>>>>>>>>> >>>>>>>>>>> @ScheamFieldId(0) >>>>>>>>>>> public T getV1(); >>>>>>>>>>> >>>>>>>>>>> @SchemaFieldId(1) >>>>>>>>>>> public T getV2(); >>>>>>>>>>> >>>>>>>>>> Yes please. If this is an easy change and If you can point me in >>>>>>>>>> the right direction I will be happy to submit a patch for this. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> etc. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> SerializableCoder can be tricky for updates, and can also be >>>>>>>>>>>> very inefficient. >>>>>>>>>>>> >>>>>>>>>>>> As you noticed, when inferring schemas from a Java class, the >>>>>>>>>>>> order of fields is non deterministic (not much that we can do >>>>>>>>>>>> about this, >>>>>>>>>>>> as Java doesn't return the fields in a deterministic order). For >>>>>>>>>>>> regular >>>>>>>>>>>> PCollections, Dataflow (though currently only Dataflow) will allow >>>>>>>>>>>> update >>>>>>>>>>>> even though the field orders have changed. Unfortunately >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I now have SchemaCoder for all data flowing through >>>>>>>>>>>>> the pipeline. >>>>>>>>>>>>> I am noticing another issue once a new pipeline is started >>>>>>>>>>>>> with --update flag. In my stateful DoFn where I am doing >>>>>>>>>>>>> prevState.read(), >>>>>>>>>>>>> exceptions are being thrown. errors like >>>>>>>>>>>>> Caused by: java.io.EOFException: reached end of stream after >>>>>>>>>>>>> reading 68 bytes; 101 bytes expected >>>>>>>>>>>>> In this case there was no change in code or data model when >>>>>>>>>>>>> relaunching the pipeline. >>>>>>>>>>>>> >>>>>>>>>>>>> It seems that Schema for my Bean is changing on every run of >>>>>>>>>>>>> the pipeline. I manually inspected the Schema returned by the >>>>>>>>>>>>> SchemaRegistry by running the pipeline a few times locally and >>>>>>>>>>>>> each time I >>>>>>>>>>>>> noticed the order of fields in the Schema was different. This >>>>>>>>>>>>> changing of >>>>>>>>>>>>> order of fields is breaking the new pipeline. Is there a way to >>>>>>>>>>>>> enforce >>>>>>>>>>>>> some kind of ordering on the fields? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Looking at source code for the JavaBeamSchema it seems I will >>>>>>>>>>>>>> have to annotate my getter and setters with any flavour of >>>>>>>>>>>>>> @Nullable. >>>>>>>>>>>>>> Another question - >>>>>>>>>>>>>> does the dataflow update pipeline feature work only with >>>>>>>>>>>>>> SchemaCoder? I have SerializableCoder and ProtoCoder being used >>>>>>>>>>>>>> in some of >>>>>>>>>>>>>> my other pipelines. I wonder if I have to change those too to >>>>>>>>>>>>>> open doors >>>>>>>>>>>>>> for updating pipelines there? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Some progress made. >>>>>>>>>>>>>>> Now I can see that schema has all the needed fields. But all >>>>>>>>>>>>>>> fields in the returned schema are marked as "NOT NULL". even >>>>>>>>>>>>>>> though the >>>>>>>>>>>>>>> fields are annotated with @javax.annotation.Nullable. >>>>>>>>>>>>>>> Is there a different flavor of @Nullable I need to use on my >>>>>>>>>>>>>>> fields? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Ah, this isn't a POJO then. In this case, try using >>>>>>>>>>>>>>>> DefaultSchema(JavaBeanSchema.class) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It looks something like this. It currently has around 40 >>>>>>>>>>>>>>>>> fields. >>>>>>>>>>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true) >>>>>>>>>>>>>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>>>>>>>>>>>>> @EqualsAndHashCode >>>>>>>>>>>>>>>>> @ToString >>>>>>>>>>>>>>>>> public class POJO implements SomeInterface, Serializable { >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public static final Integer SOME_CONSTANT_FIELD = 2; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>>>>>>> private String field1; >>>>>>>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>>>>>>> private String field2; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @Nullable >>>>>>>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>>>>>>> private Integer field3; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ..... >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>>>>>>> public String getField1() { >>>>>>>>>>>>>>>>> return field1; >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>>>>>>> public void setField1(String field1) { >>>>>>>>>>>>>>>>> this.field1 = field1; >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>>>>>>> public String getField2() { >>>>>>>>>>>>>>>>> return field2; >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>>>>>>> public void setField2(String field2) { >>>>>>>>>>>>>>>>> this.field2 = field2; >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>>>>>>> public Integer getField3() { >>>>>>>>>>>>>>>>> return field3; >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>>>>>>> public void setField3(Integer field3) { >>>>>>>>>>>>>>>>> this.field3 = field3; >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public POJO() { >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 0 arg empty constructor >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> public POJO(POJO other) { >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ... copy constructor >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Can you paste the code for your Pojo? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a >>>>>>>>>>>>>>>>>>> empty schema. >>>>>>>>>>>>>>>>>>> my Pojo is annotated with >>>>>>>>>>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class) >>>>>>>>>>>>>>>>>>> Is there something extra I need to do here to register >>>>>>>>>>>>>>>>>>> my class with a schema registry. >>>>>>>>>>>>>>>>>>> Note: the code which is building the pipeline is sitting >>>>>>>>>>>>>>>>>>> in a library (Different package) which is being imported >>>>>>>>>>>>>>>>>>> into my pipeline >>>>>>>>>>>>>>>>>>> code. So perhaps there is some configuration which is >>>>>>>>>>>>>>>>>>> missing which allows >>>>>>>>>>>>>>>>>>> the framework to discover my Pojo and the annotations >>>>>>>>>>>>>>>>>>> associated with it. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think I can make it work now. I found a utility >>>>>>>>>>>>>>>>>>>>>> method for building my coder from class >>>>>>>>>>>>>>>>>>>>>> Something like >>>>>>>>>>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass(); >>>>>>>>>>>>>>>>>>>>>> Coder<Data> dataCoder = >>>>>>>>>>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass), >>>>>>>>>>>>>>>>>>>>>> TypeDescriptor.of(dataClass), >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass), >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass)); >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This will work. Though, did annotating the POJO like I >>>>>>>>>>>>>>>>>>>>> said not work? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> No, annotation alone does not work since I am not >>>>>>>>>>>>>>>>>>>> using concrete classes in the code where the pipeline is >>>>>>>>>>>>>>>>>>>> being constructed. >>>>>>>>>>>>>>>>>>>> <Data> above is a template variable in the class which is >>>>>>>>>>>>>>>>>>>> constructing the >>>>>>>>>>>>>>>>>>>> pipeline. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> removing setCoder call breaks my pipeline. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> No Coder has been manually specified; you may do >>>>>>>>>>>>>>>>>>>>>>> so using .setCoder(). >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Inferring a Coder from the CoderRegistry failed: >>>>>>>>>>>>>>>>>>>>>>> Unable to provide a Coder for Data. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Building a Coder using a registered CoderProvider >>>>>>>>>>>>>>>>>>>>>>> failed. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Reason being the code which is building the pipeline >>>>>>>>>>>>>>>>>>>>>>> is based on Java Generics. Actual pipeline building >>>>>>>>>>>>>>>>>>>>>>> code sets a bunch of >>>>>>>>>>>>>>>>>>>>>>> parameters which are used to construct the pipeline. >>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> stream = >>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder) >>>>>>>>>>>>>>>>>>>>>>> So I guess I will need to provide some more >>>>>>>>>>>>>>>>>>>>>>> information to the framework to make the annotation >>>>>>>>>>>>>>>>>>>>>>> work. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax < >>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> If you annotate your POJO >>>>>>>>>>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will >>>>>>>>>>>>>>>>>>>>>>>> usually automatically >>>>>>>>>>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the >>>>>>>>>>>>>>>>>>>>>>>> setCoder call). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema >>>>>>>>>>>>>>>>>>>>>>>>> encoding. >>>>>>>>>>>>>>>>>>>>>>>>> In my current code I am doing something like this >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> = >>>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class)) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax < >>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro >>>>>>>>>>>>>>>>>>>>>>>>>> coder. Can you use Beam's schema encoding instead? >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for >>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests >>>>>>>>>>>>>>>>>>>>>>>>>>> if incompatible changes >>>>>>>>>>>>>>>>>>>>>>>>>>> are made to Pojo. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik < >>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the >>>>>>>>>>>>>>>>>>>>>>>>>>>> POJO before and after the change to ensure that >>>>>>>>>>>>>>>>>>>>>>>>>>>> they are compatible as you >>>>>>>>>>>>>>>>>>>>>>>>>>>> expect. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess >>>>>>>>>>>>>>>>>>>>>>>>>>>>> but asking here in hopes someone has faced a >>>>>>>>>>>>>>>>>>>>>>>>>>>>> similar problem and can help. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a running Dataflow job. I am noticing that >>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility checks fail any >>>>>>>>>>>>>>>>>>>>>>>>>>>>> time I add a new field to my data model. Error >>>>>>>>>>>>>>>>>>>>>>>>>>>>> says >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ has changed >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data. Avro coder to >>>>>>>>>>>>>>>>>>>>>>>>>>>>> serialize the model. I read somewhere that adding >>>>>>>>>>>>>>>>>>>>>>>>>>>>> new optional fields to the data should work when >>>>>>>>>>>>>>>>>>>>>>>>>>>>> updating the pipeline. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or >>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of the model to something which >>>>>>>>>>>>>>>>>>>>>>>>>>>>> allows me to update the pipeline in cases when I >>>>>>>>>>>>>>>>>>>>>>>>>>>>> add new optional fields to existing model. Any >>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions? >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
