Hi Reuven, can you please point me to some instructions regarding how to publish artifacts to local maven. `./gradlew publishToMavenLocal` seems to be not working. I don't see the 2.37.0-SNAPSHOT in my local maven.
On Mon, Jan 10, 2022 at 11:42 AM Reuven Lax <[email protected]> wrote: > Gaurav, > > Can you try this out and tell me if it works for you? > https://github.com/apache/beam/pull/16472 > > This allows you to annotate all your getters with SchemaFieldNumber, to > specify the order of fields. > > Reuven > > On Mon, Jan 10, 2022 at 9:28 AM Reuven Lax <[email protected]> wrote: > >> Yes, timers will get carried over. >> >> On Mon, Jan 10, 2022 at 9:19 AM gaurav mishra < >> [email protected]> wrote: >> >>> That DelegateCoder seems like a good idea to keep the DoFn code clean. >>> In the short term I will perhaps use json string as my intermediary and >>> will let Jackson do the serialization and deserialization. from what I have >>> learned so far is that Dataflow does some magic in the background to >>> re-order fields to make the PCollection schema compatible to the previous >>> job (if I use SchemaCoder), So I don't have to make a switch from Java >>> beans yet. I can use the DelegateCoder for my statespec and continue >>> using SchemaCoder for PCollections. >>> >>> @Reuven >>> Regarding timers in stateful DoFn, do they get carried over to the new >>> job in Dataflow? >>> >>> >>> >>> >>> On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin <[email protected]> >>> wrote: >>> >>>> Hello Gaurav, >>>> >>>> It looks like the issues related to state evolution is something quite >>>> recurrent in this mailing list, and do not seem to be covered in the Beam >>>> docs, unfortunately. >>>> >>>> I am not familiar with Google Dataflow runner, but the way I achieved >>>> state evolution on Flink runner was a combo of: >>>> 1 - version-controlled Avro schemas for generating POJOs (models) which >>>> were used in stateful Beam DoFns >>>> 2 - a generic Avro-generated POJO as an intermediary - ( schema: >>>> string, payload: bytes ) >>>> 3 - >>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html >>>> responsible for converting a model into an intermediary (2) >>>> >>>> This allowed adding & dropping nullable fields and to do other >>>> Avro-forward compatible changes without loosing the state. The order of >>>> fields also didn't matter in this setting. And this also meant double ser- >>>> and deserialization, unfortunately. But I wasn't able to come to any better >>>> solution, given that there seemed to be no public info available about the >>>> topic, or, maybe, due to the lack of my experience. >>>> >>>> Relevant mail threads: >>>> https://www.mail-archive.com/[email protected]/msg07249.html >>>> >>>> Best Regards, >>>> Pavel Solomin >>>> >>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin >>>> | Linkedin <https://www.linkedin.com/in/pavelsolomin> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, 10 Jan 2022 at 08:43, gaurav mishra < >>>> [email protected]> wrote: >>>> >>>>> >>>>> >>>>> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> Unfortunately, as you've discovered, Dataflow's update only pays >>>>>> attention to actual PCollections, and does not look at DoFn state >>>>>> variables >>>>>> at all (historically, stateful DoFns did not yet exist when Dataflow >>>>>> implemented its update support). >>>>>> >>>>>> You can file a feature request against Google to implement update on >>>>>> state variables, as this is honestly a major feature gap. In the meantime >>>>>> there might be some workarounds you can use until this support is added. >>>>>> e.g. >>>>>> - you could convert the data into a protocol buffer before storing >>>>>> into state; ProtoCoder should handle new fields, and fields in a protocol >>>>>> buffer have a deterministic order. >>>>>> >>>>> Is there an easy way to go from a java object to a protocol buffer >>>>> object. The only way I can think of is to first serialize the java model >>>>> to >>>>> json and then deserialize the json into a proto buffer object. Same for >>>>> the other way around - protobuf to java object. while I was writing this >>>>> I >>>>> realized I can maybe store the json dump of my java object in state spec. >>>>> That should also work. >>>>> >>>>>> - you could explicitly use the Row object, although this will >>>>>> again make your code more complex. >>>>>> >>>>> >>>>>> Another intermediate option - we could fairly easily add >>>>>> a @SchemaFieldId annotation to Beam that would allow you to specify the >>>>>> order of fields. You would have to annotate every getter, something like >>>>>> this: >>>>>> >>>>>> @ScheamFieldId(0) >>>>>> public T getV1(); >>>>>> >>>>>> @SchemaFieldId(1) >>>>>> public T getV2(); >>>>>> >>>>> Yes please. If this is an easy change and If you can point me in the >>>>> right direction I will be happy to submit a patch for this. >>>>> >>>>> >>>>> >>>>>> >>>>>> etc. >>>>>> >>>>>> >>>>>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> SerializableCoder can be tricky for updates, and can also be very >>>>>>> inefficient. >>>>>>> >>>>>>> As you noticed, when inferring schemas from a Java class, the order >>>>>>> of fields is non deterministic (not much that we can do about this, as >>>>>>> Java >>>>>>> doesn't return the fields in a deterministic order). For regular >>>>>>> PCollections, Dataflow (though currently only Dataflow) will allow >>>>>>> update >>>>>>> even though the field orders have changed. Unfortunately >>>>>>> >>>>>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> I now have SchemaCoder for all data flowing through the pipeline. >>>>>>>> I am noticing another issue once a new pipeline is started with >>>>>>>> --update flag. In my stateful DoFn where I am doing prevState.read(), >>>>>>>> exceptions are being thrown. errors like >>>>>>>> Caused by: java.io.EOFException: reached end of stream after >>>>>>>> reading 68 bytes; 101 bytes expected >>>>>>>> In this case there was no change in code or data model when >>>>>>>> relaunching the pipeline. >>>>>>>> >>>>>>>> It seems that Schema for my Bean is changing on every run of the >>>>>>>> pipeline. I manually inspected the Schema returned by the >>>>>>>> SchemaRegistry by >>>>>>>> running the pipeline a few times locally and each time I noticed the >>>>>>>> order >>>>>>>> of fields in the Schema was different. This changing of order of >>>>>>>> fields is >>>>>>>> breaking the new pipeline. Is there a way to enforce some kind of >>>>>>>> ordering >>>>>>>> on the fields? >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Looking at source code for the JavaBeamSchema it seems I will have >>>>>>>>> to annotate my getter and setters with any flavour of @Nullable. >>>>>>>>> Another question - >>>>>>>>> does the dataflow update pipeline feature work only with >>>>>>>>> SchemaCoder? I have SerializableCoder and ProtoCoder being used in >>>>>>>>> some of >>>>>>>>> my other pipelines. I wonder if I have to change those too to open >>>>>>>>> doors >>>>>>>>> for updating pipelines there? >>>>>>>>> >>>>>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Some progress made. >>>>>>>>>> Now I can see that schema has all the needed fields. But all >>>>>>>>>> fields in the returned schema are marked as "NOT NULL". even though >>>>>>>>>> the >>>>>>>>>> fields are annotated with @javax.annotation.Nullable. >>>>>>>>>> Is there a different flavor of @Nullable I need to use on my >>>>>>>>>> fields? >>>>>>>>>> >>>>>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Ah, this isn't a POJO then. In this case, try using >>>>>>>>>>> DefaultSchema(JavaBeanSchema.class) >>>>>>>>>>> >>>>>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> It looks something like this. It currently has around 40 >>>>>>>>>>>> fields. >>>>>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true) >>>>>>>>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>>>>>>>> @EqualsAndHashCode >>>>>>>>>>>> @ToString >>>>>>>>>>>> public class POJO implements SomeInterface, Serializable { >>>>>>>>>>>> >>>>>>>>>>>> public static final Integer SOME_CONSTANT_FIELD = 2; >>>>>>>>>>>> >>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>> private String field1; >>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>> private String field2; >>>>>>>>>>>> >>>>>>>>>>>> @Nullable >>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>> private Integer field3; >>>>>>>>>>>> >>>>>>>>>>>> ..... >>>>>>>>>>>> >>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>> public String getField1() { >>>>>>>>>>>> return field1; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>> public void setField1(String field1) { >>>>>>>>>>>> this.field1 = field1; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>> public String getField2() { >>>>>>>>>>>> return field2; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>> public void setField2(String field2) { >>>>>>>>>>>> this.field2 = field2; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>> public Integer getField3() { >>>>>>>>>>>> return field3; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>> public void setField3(Integer field3) { >>>>>>>>>>>> this.field3 = field3; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> public POJO() { >>>>>>>>>>>> >>>>>>>>>>>> 0 arg empty constructor >>>>>>>>>>>> } >>>>>>>>>>>> public POJO(POJO other) { >>>>>>>>>>>> >>>>>>>>>>>> ... copy constructor >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Can you paste the code for your Pojo? >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a >>>>>>>>>>>>>> empty schema. >>>>>>>>>>>>>> my Pojo is annotated with >>>>>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class) >>>>>>>>>>>>>> Is there something extra I need to do here to register my >>>>>>>>>>>>>> class with a schema registry. >>>>>>>>>>>>>> Note: the code which is building the pipeline is sitting in a >>>>>>>>>>>>>> library (Different package) which is being imported into my >>>>>>>>>>>>>> pipeline code. >>>>>>>>>>>>>> So perhaps there is some configuration which is missing which >>>>>>>>>>>>>> allows the >>>>>>>>>>>>>> framework to discover my Pojo and the annotations associated >>>>>>>>>>>>>> with it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think I can make it work now. I found a utility method >>>>>>>>>>>>>>>>> for building my coder from class >>>>>>>>>>>>>>>>> Something like >>>>>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass(); >>>>>>>>>>>>>>>>> Coder<Data> dataCoder = >>>>>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass), >>>>>>>>>>>>>>>>> TypeDescriptor.of(dataClass), >>>>>>>>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass), >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass)); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This will work. Though, did annotating the POJO like I said >>>>>>>>>>>>>>>> not work? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> No, annotation alone does not work since I am not using >>>>>>>>>>>>>>> concrete classes in the code where the pipeline is being >>>>>>>>>>>>>>> constructed. >>>>>>>>>>>>>>> <Data> above is a template variable in the class which is >>>>>>>>>>>>>>> constructing the >>>>>>>>>>>>>>> pipeline. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> removing setCoder call breaks my pipeline. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> No Coder has been manually specified; you may do so >>>>>>>>>>>>>>>>>> using .setCoder(). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Inferring a Coder from the CoderRegistry failed: >>>>>>>>>>>>>>>>>> Unable to provide a Coder for Data. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Building a Coder using a registered CoderProvider >>>>>>>>>>>>>>>>>> failed. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Reason being the code which is building the pipeline is >>>>>>>>>>>>>>>>>> based on Java Generics. Actual pipeline building code sets a >>>>>>>>>>>>>>>>>> bunch of >>>>>>>>>>>>>>>>>> parameters which are used to construct the pipeline. >>>>>>>>>>>>>>>>>> PCollection<Data> stream = >>>>>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder) >>>>>>>>>>>>>>>>>> So I guess I will need to provide some more information >>>>>>>>>>>>>>>>>> to the framework to make the annotation work. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> If you annotate your POJO >>>>>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will >>>>>>>>>>>>>>>>>>> usually automatically >>>>>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder >>>>>>>>>>>>>>>>>>> call). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding. >>>>>>>>>>>>>>>>>>>> In my current code I am doing something like this >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> PCollection<Data> = >>>>>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class)) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder. >>>>>>>>>>>>>>>>>>>>> Can you use Beam's schema encoding instead? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for >>>>>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests if >>>>>>>>>>>>>>>>>>>>>> incompatible changes >>>>>>>>>>>>>>>>>>>>>> are made to Pojo. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik < >>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO >>>>>>>>>>>>>>>>>>>>>>> before and after the change to ensure that they are >>>>>>>>>>>>>>>>>>>>>>> compatible as you >>>>>>>>>>>>>>>>>>>>>>> expect. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but >>>>>>>>>>>>>>>>>>>>>>>> asking here in hopes someone has faced a similar >>>>>>>>>>>>>>>>>>>>>>>> problem and can help. >>>>>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a >>>>>>>>>>>>>>>>>>>>>>>> running Dataflow job. I am noticing that compatibility >>>>>>>>>>>>>>>>>>>>>>>> checks fail any time >>>>>>>>>>>>>>>>>>>>>>>> I add a new field to my data model. Error says >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ has changed >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data. Avro coder to >>>>>>>>>>>>>>>>>>>>>>>> serialize the model. I read somewhere that adding new >>>>>>>>>>>>>>>>>>>>>>>> optional fields to the data should work when updating >>>>>>>>>>>>>>>>>>>>>>>> the pipeline. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of >>>>>>>>>>>>>>>>>>>>>>>> the model to something which allows me to update the >>>>>>>>>>>>>>>>>>>>>>>> pipeline in cases when I add new optional fields to >>>>>>>>>>>>>>>>>>>>>>>> existing model. Any suggestions? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
