Continuing on my journey for getting my dataflow jobs to update without having to restart. As recommended above, I am now using SchemaCoder for all PCollections. For my statespec I am now storing a single string object. private final StateSpec<ValueState<String>> stateCache = StateSpecs.value(StringUtf8Coder.of());
But still i am getting errors when the new job tries to read the prev state from statespec. The errors look like this - Error message from worker: org.apache.beam.sdk.coders.CoderException: java.io.EOFException: reached end of stream after reading 31 bytes; 52 bytes expected org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:104) org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90) org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37) org.apache.beam.sdk.coders.RowCoderGenerator$DecodeInstruction.decodeDelegate(RowCoderGenerator.java:435) org.apache.beam.sdk.coders.Coder$ByteBuddy$N2jlNc4o.decode(Unknown Source) org.apache.beam.sdk.coders.Coder$ByteBuddy$N2jlNc4o.decode(Unknown Source) org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129) org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) org.apache.beam.runners.dataflow.worker.UngroupedWindmillReader$UngroupedWindmillReaderIterator.decode(UngroupedWindmillReader.java:127) org.apache.beam.runners.dataflow.worker.UngroupedWindmillReader$UngroupedWindmillReaderIterator.decodeMessage(UngroupedWindmillReader.java:118) org.apache.beam.runners.dataflow.worker.WindmillReaderIteratorBase.advance(WindmillReaderIteratorBase.java:60) org.apache.beam.runners.dataflow.worker.WindmillReaderIteratorBase.start(WindmillReaderIteratorBase.java:46) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:375) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:205) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1437) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1113) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.io.EOFException: reached end of stream after reading 31 bytes; 52 bytes expected org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:780) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:762) org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:60) org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100) org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90) org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37) org.apache.beam.sdk.coders.RowCoderGenerator$DecodeInstruction.decodeDelegate(RowCoderGenerator.java:435) On Tue, Jan 18, 2022 at 5:28 PM Reuven Lax <[email protected]> wrote: > Yes please. > > On Tue, Jan 18, 2022 at 5:13 PM gaurav mishra < > [email protected]> wrote: > >> >> >> On Tue, Jan 11, 2022 at 4:04 PM Reuven Lax <[email protected]> wrote: >> >>> Probably a bug - JavaFieldSchema does walk up the inheritance tree (see >>> ReflectUtils.getFields) >>> >> Should I open a Jira for this? >> >> >>> >>> Do you have a github account? If so I can add you as a reviewer on this >>> PR. >>> >>> On Tue, Jan 11, 2022 at 3:58 PM gaurav mishra < >>> [email protected]> wrote: >>> >>>> That's a valid point. I can live with the current solution for now. And >>>> hope that Dataflow will add that support soon. >>>> >>>> Another question regarding JavaBeanSchema implementation, I noticed we >>>> are using clazz.getDeclaredMethods() for preparing the schema. This ignores >>>> the base class's getters, is there a reason we are not using >>>> clazz.getMethods() for preparing the schema. >>>> >>>> On Tue, Jan 11, 2022 at 3:24 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> What matters is the physical order of the fields in the schema >>>>> encoding. If you add gaps, you'll be fine initially (since the code I >>>>> added >>>>> simply sorts by the FieldNumber), however you'll have problems on update. >>>>> >>>>> For example, let's say you have the following >>>>> >>>>> SchemaFieldNumber("0") String fieldOne; >>>>> SchemaFieldNumber("50") String fieldTwo; >>>>> >>>>> If you remove the check, things will appear just fine. However if you >>>>> then add a new field number in that gap, you'll have problems: >>>>> >>>>> SchemaFieldNumber("0") String fieldOne; >>>>> SchemaFieldNumber("1") String fieldThree; >>>>> SchemaFieldNumber("50") String fieldTwo; >>>>> >>>>> Now fieldThree will be expected to be the second field encoded, and >>>>> you won't be able to update. >>>>> >>>>> Dataflow handles schema updates by keeping track of the full map of >>>>> field name -> encoding position from the old job and telling the coders of >>>>> the new job to maintain the old encoding positions; this forces all new >>>>> fields to be added to the end of the encoding. I think the long-term >>>>> solution here is for Dataflow to support this on state variables as well. >>>>> >>>>> Reuven >>>>> >>>>> >>>>> On Tue, Jan 11, 2022 at 1:55 PM gaurav mishra < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Reuven, >>>>>> In your patch I just dropped the check for gaps in the numbers and >>>>>> introduced gaps in the annotated field numbers in my beans. Generated >>>>>> schema seems to be consistent and still works across pipeline updates. >>>>>> >>>>>> Preconditions.checkState( >>>>>> number == i, >>>>>> "Expected field number " >>>>>> + i >>>>>> + " for field: " >>>>>> + type.getName() >>>>>> + " instead got " >>>>>> + number); >>>>>> >>>>>> Is there any reason why we are checking for gaps there? >>>>>> >>>>>> On Mon, Jan 10, 2022 at 5:37 PM gaurav mishra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Jan 10, 2022 at 11:42 AM Reuven Lax <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Gaurav, >>>>>>>> >>>>>>>> Can you try this out and tell me if it works for you? >>>>>>>> https://github.com/apache/beam/pull/16472 >>>>>>>> >>>>>>>> This allows you to annotate all your getters with >>>>>>>> SchemaFieldNumber, to specify the order of fields. >>>>>>>> >>>>>>> Hi Reuven >>>>>>> I am able to build and try out your code and it seems to be working >>>>>>> fine so far. I will continue to do more testing with this build. But I >>>>>>> do >>>>>>> see a small issue with the current implementation. In the documentation >>>>>>> it >>>>>>> says there are no gaps allowed in those numbers which makes it hard to >>>>>>> use >>>>>>> this on classes which inherit from another class which uses this >>>>>>> annotation. >>>>>>> In my case I have an input Bean and an Output Bean that extends the >>>>>>> Input bean. I have to configure Coders for both these Beans. Input Bean >>>>>>> today uses number 0-40, and output Bean uses 41-46. If tomorrow I add a >>>>>>> new >>>>>>> field to the input bean then I will have to use 47 on that field. to >>>>>>> avoid duplicates in output Bean. The only solution around this problem I >>>>>>> can think of is to allow for gaps in the number. >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Mon, Jan 10, 2022 at 9:28 AM Reuven Lax <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Yes, timers will get carried over. >>>>>>>>> >>>>>>>>> On Mon, Jan 10, 2022 at 9:19 AM gaurav mishra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> That DelegateCoder seems like a good idea to keep the DoFn code >>>>>>>>>> clean. In the short term I will perhaps use json string as my >>>>>>>>>> intermediary >>>>>>>>>> and will let Jackson do the serialization and deserialization. from >>>>>>>>>> what I >>>>>>>>>> have learned so far is that Dataflow does some magic in the >>>>>>>>>> background to >>>>>>>>>> re-order fields to make the PCollection schema compatible to the >>>>>>>>>> previous >>>>>>>>>> job (if I use SchemaCoder), So I don't have to make a switch from >>>>>>>>>> Java >>>>>>>>>> beans yet. I can use the DelegateCoder for my statespec and continue >>>>>>>>>> using SchemaCoder for PCollections. >>>>>>>>>> >>>>>>>>>> @Reuven >>>>>>>>>> Regarding timers in stateful DoFn, do they get carried over to >>>>>>>>>> the new job in Dataflow? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hello Gaurav, >>>>>>>>>>> >>>>>>>>>>> It looks like the issues related to state evolution is something >>>>>>>>>>> quite recurrent in this mailing list, and do not seem to be covered >>>>>>>>>>> in the >>>>>>>>>>> Beam docs, unfortunately. >>>>>>>>>>> >>>>>>>>>>> I am not familiar with Google Dataflow runner, but the way I >>>>>>>>>>> achieved state evolution on Flink runner was a combo of: >>>>>>>>>>> 1 - version-controlled Avro schemas for generating POJOs >>>>>>>>>>> (models) which were used in stateful Beam DoFns >>>>>>>>>>> 2 - a generic Avro-generated POJO as an intermediary - ( schema: >>>>>>>>>>> string, payload: bytes ) >>>>>>>>>>> 3 - >>>>>>>>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html >>>>>>>>>>> responsible for converting a model into an intermediary (2) >>>>>>>>>>> >>>>>>>>>>> This allowed adding & dropping nullable fields and to do other >>>>>>>>>>> Avro-forward compatible changes without loosing the state. The >>>>>>>>>>> order of >>>>>>>>>>> fields also didn't matter in this setting. And this also meant >>>>>>>>>>> double ser- >>>>>>>>>>> and deserialization, unfortunately. But I wasn't able to come to >>>>>>>>>>> any better >>>>>>>>>>> solution, given that there seemed to be no public info available >>>>>>>>>>> about the >>>>>>>>>>> topic, or, maybe, due to the lack of my experience. >>>>>>>>>>> >>>>>>>>>>> Relevant mail threads: >>>>>>>>>>> https://www.mail-archive.com/[email protected]/msg07249.html >>>>>>>>>>> >>>>>>>>>>> Best Regards, >>>>>>>>>>> Pavel Solomin >>>>>>>>>>> >>>>>>>>>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: >>>>>>>>>>> pavel_solomin | Linkedin >>>>>>>>>>> <https://www.linkedin.com/in/pavelsolomin> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, 10 Jan 2022 at 08:43, gaurav mishra < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Unfortunately, as you've discovered, Dataflow's update only >>>>>>>>>>>>> pays attention to actual PCollections, and does not look at DoFn >>>>>>>>>>>>> state >>>>>>>>>>>>> variables at all (historically, stateful DoFns did not yet exist >>>>>>>>>>>>> when >>>>>>>>>>>>> Dataflow implemented its update support). >>>>>>>>>>>>> >>>>>>>>>>>>> You can file a feature request against Google to implement >>>>>>>>>>>>> update on state variables, as this is honestly a major feature >>>>>>>>>>>>> gap. In the >>>>>>>>>>>>> meantime there might be some workarounds you can use until this >>>>>>>>>>>>> support is >>>>>>>>>>>>> added. e.g. >>>>>>>>>>>>> - you could convert the data into a protocol buffer before >>>>>>>>>>>>> storing into state; ProtoCoder should handle new fields, and >>>>>>>>>>>>> fields in a >>>>>>>>>>>>> protocol buffer have a deterministic order. >>>>>>>>>>>>> >>>>>>>>>>>> Is there an easy way to go from a java object to a protocol >>>>>>>>>>>> buffer object. The only way I can think of is to first serialize >>>>>>>>>>>> the java >>>>>>>>>>>> model to json and then deserialize the json into a proto buffer >>>>>>>>>>>> object. >>>>>>>>>>>> Same for the other way around - protobuf to java object. while I >>>>>>>>>>>> was >>>>>>>>>>>> writing this I realized I can maybe store the json dump of my java >>>>>>>>>>>> object >>>>>>>>>>>> in state spec. That should also work. >>>>>>>>>>>> >>>>>>>>>>>>> - you could explicitly use the Row object, although this >>>>>>>>>>>>> will again make your code more complex. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> Another intermediate option - we could fairly easily add >>>>>>>>>>>>> a @SchemaFieldId annotation to Beam that would allow you to >>>>>>>>>>>>> specify the >>>>>>>>>>>>> order of fields. You would have to annotate every getter, >>>>>>>>>>>>> something like >>>>>>>>>>>>> this: >>>>>>>>>>>>> >>>>>>>>>>>>> @ScheamFieldId(0) >>>>>>>>>>>>> public T getV1(); >>>>>>>>>>>>> >>>>>>>>>>>>> @SchemaFieldId(1) >>>>>>>>>>>>> public T getV2(); >>>>>>>>>>>>> >>>>>>>>>>>> Yes please. If this is an easy change and If you can point me >>>>>>>>>>>> in the right direction I will be happy to submit a patch for this. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> etc. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> SerializableCoder can be tricky for updates, and can also be >>>>>>>>>>>>>> very inefficient. >>>>>>>>>>>>>> >>>>>>>>>>>>>> As you noticed, when inferring schemas from a Java class, the >>>>>>>>>>>>>> order of fields is non deterministic (not much that we can do >>>>>>>>>>>>>> about this, >>>>>>>>>>>>>> as Java doesn't return the fields in a deterministic order). For >>>>>>>>>>>>>> regular >>>>>>>>>>>>>> PCollections, Dataflow (though currently only Dataflow) will >>>>>>>>>>>>>> allow update >>>>>>>>>>>>>> even though the field orders have changed. Unfortunately >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I now have SchemaCoder for all data flowing through >>>>>>>>>>>>>>> the pipeline. >>>>>>>>>>>>>>> I am noticing another issue once a new pipeline is started >>>>>>>>>>>>>>> with --update flag. In my stateful DoFn where I am doing >>>>>>>>>>>>>>> prevState.read(), >>>>>>>>>>>>>>> exceptions are being thrown. errors like >>>>>>>>>>>>>>> Caused by: java.io.EOFException: reached end of stream after >>>>>>>>>>>>>>> reading 68 bytes; 101 bytes expected >>>>>>>>>>>>>>> In this case there was no change in code or data model when >>>>>>>>>>>>>>> relaunching the pipeline. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It seems that Schema for my Bean is changing on every run of >>>>>>>>>>>>>>> the pipeline. I manually inspected the Schema returned by the >>>>>>>>>>>>>>> SchemaRegistry by running the pipeline a few times locally and >>>>>>>>>>>>>>> each time I >>>>>>>>>>>>>>> noticed the order of fields in the Schema was different. This >>>>>>>>>>>>>>> changing of >>>>>>>>>>>>>>> order of fields is breaking the new pipeline. Is there a way to >>>>>>>>>>>>>>> enforce >>>>>>>>>>>>>>> some kind of ordering on the fields? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Looking at source code for the JavaBeamSchema it seems I >>>>>>>>>>>>>>>> will have to annotate my getter and setters with any flavour >>>>>>>>>>>>>>>> of @Nullable. >>>>>>>>>>>>>>>> Another question - >>>>>>>>>>>>>>>> does the dataflow update pipeline feature work only with >>>>>>>>>>>>>>>> SchemaCoder? I have SerializableCoder and ProtoCoder being >>>>>>>>>>>>>>>> used in some of >>>>>>>>>>>>>>>> my other pipelines. I wonder if I have to change those too to >>>>>>>>>>>>>>>> open doors >>>>>>>>>>>>>>>> for updating pipelines there? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Some progress made. >>>>>>>>>>>>>>>>> Now I can see that schema has all the needed fields. But >>>>>>>>>>>>>>>>> all fields in the returned schema are marked as "NOT NULL". >>>>>>>>>>>>>>>>> even though the >>>>>>>>>>>>>>>>> fields are annotated with @javax.annotation.Nullable. >>>>>>>>>>>>>>>>> Is there a different flavor of @Nullable I need to use on >>>>>>>>>>>>>>>>> my fields? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Ah, this isn't a POJO then. In this case, try using >>>>>>>>>>>>>>>>>> DefaultSchema(JavaBeanSchema.class) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It looks something like this. It currently has around 40 >>>>>>>>>>>>>>>>>>> fields. >>>>>>>>>>>>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true) >>>>>>>>>>>>>>>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>>>>>>>>>>>>>>> @EqualsAndHashCode >>>>>>>>>>>>>>>>>>> @ToString >>>>>>>>>>>>>>>>>>> public class POJO implements SomeInterface, Serializable >>>>>>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> public static final Integer SOME_CONSTANT_FIELD = 2; >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>>>>>>>>> private String field1; >>>>>>>>>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>>>>>>>>> private String field2; >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @Nullable >>>>>>>>>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>>>>>>>>> private Integer field3; >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ..... >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>>>>>>>>> public String getField1() { >>>>>>>>>>>>>>>>>>> return field1; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @JsonProperty("field_1") >>>>>>>>>>>>>>>>>>> public void setField1(String field1) { >>>>>>>>>>>>>>>>>>> this.field1 = field1; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>>>>>>>>> public String getField2() { >>>>>>>>>>>>>>>>>>> return field2; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @JsonProperty("field_2") >>>>>>>>>>>>>>>>>>> public void setField2(String field2) { >>>>>>>>>>>>>>>>>>> this.field2 = field2; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>>>>>>>>> public Integer getField3() { >>>>>>>>>>>>>>>>>>> return field3; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @JsonProperty("field_3") >>>>>>>>>>>>>>>>>>> public void setField3(Integer field3) { >>>>>>>>>>>>>>>>>>> this.field3 = field3; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> public POJO() { >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 0 arg empty constructor >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> public POJO(POJO other) { >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ... copy constructor >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Can you paste the code for your Pojo? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning >>>>>>>>>>>>>>>>>>>>> a empty schema. >>>>>>>>>>>>>>>>>>>>> my Pojo is annotated with >>>>>>>>>>>>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class) >>>>>>>>>>>>>>>>>>>>> Is there something extra I need to do here to register >>>>>>>>>>>>>>>>>>>>> my class with a schema registry. >>>>>>>>>>>>>>>>>>>>> Note: the code which is building the pipeline is >>>>>>>>>>>>>>>>>>>>> sitting in a library (Different package) which is being >>>>>>>>>>>>>>>>>>>>> imported into my >>>>>>>>>>>>>>>>>>>>> pipeline code. So perhaps there is some configuration >>>>>>>>>>>>>>>>>>>>> which is missing >>>>>>>>>>>>>>>>>>>>> which allows the framework to discover my Pojo and the >>>>>>>>>>>>>>>>>>>>> annotations >>>>>>>>>>>>>>>>>>>>> associated with it. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax < >>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I think I can make it work now. I found a utility >>>>>>>>>>>>>>>>>>>>>>>> method for building my coder from class >>>>>>>>>>>>>>>>>>>>>>>> Something like >>>>>>>>>>>>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass(); >>>>>>>>>>>>>>>>>>>>>>>> Coder<Data> dataCoder = >>>>>>>>>>>>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass), >>>>>>>>>>>>>>>>>>>>>>>> TypeDescriptor.of(dataClass), >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass), >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass)); >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> This will work. Though, did annotating the POJO like >>>>>>>>>>>>>>>>>>>>>>> I said not work? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> No, annotation alone does not work since I am not >>>>>>>>>>>>>>>>>>>>>> using concrete classes in the code where the pipeline is >>>>>>>>>>>>>>>>>>>>>> being constructed. >>>>>>>>>>>>>>>>>>>>>> <Data> above is a template variable in the class which >>>>>>>>>>>>>>>>>>>>>> is constructing the >>>>>>>>>>>>>>>>>>>>>> pipeline. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> removing setCoder call breaks my pipeline. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> No Coder has been manually specified; you may do >>>>>>>>>>>>>>>>>>>>>>>>> so using .setCoder(). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Inferring a Coder from the CoderRegistry >>>>>>>>>>>>>>>>>>>>>>>>> failed: Unable to provide a Coder for Data. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Building a Coder using a registered >>>>>>>>>>>>>>>>>>>>>>>>> CoderProvider failed. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Reason being the code which is building the >>>>>>>>>>>>>>>>>>>>>>>>> pipeline is based on Java Generics. Actual pipeline >>>>>>>>>>>>>>>>>>>>>>>>> building code sets a >>>>>>>>>>>>>>>>>>>>>>>>> bunch of parameters which are used to construct the >>>>>>>>>>>>>>>>>>>>>>>>> pipeline. >>>>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> stream = >>>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder) >>>>>>>>>>>>>>>>>>>>>>>>> So I guess I will need to provide some more >>>>>>>>>>>>>>>>>>>>>>>>> information to the framework to make the annotation >>>>>>>>>>>>>>>>>>>>>>>>> work. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax < >>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> If you annotate your POJO >>>>>>>>>>>>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that >>>>>>>>>>>>>>>>>>>>>>>>>> will usually automatically >>>>>>>>>>>>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the >>>>>>>>>>>>>>>>>>>>>>>>>> setCoder call). >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema >>>>>>>>>>>>>>>>>>>>>>>>>>> encoding. >>>>>>>>>>>>>>>>>>>>>>>>>>> In my current code I am doing something like this >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> = >>>>>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class)) >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax < >>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro >>>>>>>>>>>>>>>>>>>>>>>>>>>> coder. Can you use Beam's schema encoding instead? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for >>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests >>>>>>>>>>>>>>>>>>>>>>>>>>>>> if incompatible changes >>>>>>>>>>>>>>>>>>>>>>>>>>>>> are made to Pojo. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik < >>>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> POJO before and after the change to ensure that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> they are compatible as you >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expect. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but asking here in hopes someone has faced a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> similar problem and can help. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> update a running Dataflow job. I am noticing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that compatibility checks fail >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any time I add a new field to my data model. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Error says >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ has changed >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data. Avro coder to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serialize the model. I read somewhere that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding new optional fields to the data should >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> work when updating the pipeline. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of the model to something which >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allows me to update the pipeline in cases when >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I add new optional fields to existing model. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Any suggestions? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
