Continuing on my journey for getting my dataflow jobs to update without
having to restart.
As recommended above, I am now using SchemaCoder for all PCollections.
For my statespec I am now storing a single string object.
private final StateSpec<ValueState<String>> stateCache =
StateSpecs.value(StringUtf8Coder.of());

But still i am getting errors when the new job tries to read the prev state
from statespec.  The errors look like this -
Error message from worker: org.apache.beam.sdk.coders.CoderException:
java.io.EOFException: reached end of stream after reading 31 bytes; 52
bytes expected
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:104)
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90)
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37)
org.apache.beam.sdk.coders.RowCoderGenerator$DecodeInstruction.decodeDelegate(RowCoderGenerator.java:435)
org.apache.beam.sdk.coders.Coder$ByteBuddy$N2jlNc4o.decode(Unknown Source)
org.apache.beam.sdk.coders.Coder$ByteBuddy$N2jlNc4o.decode(Unknown Source)
org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129)
org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
org.apache.beam.runners.dataflow.worker.UngroupedWindmillReader$UngroupedWindmillReaderIterator.decode(UngroupedWindmillReader.java:127)
org.apache.beam.runners.dataflow.worker.UngroupedWindmillReader$UngroupedWindmillReaderIterator.decodeMessage(UngroupedWindmillReader.java:118)
org.apache.beam.runners.dataflow.worker.WindmillReaderIteratorBase.advance(WindmillReaderIteratorBase.java:60)
org.apache.beam.runners.dataflow.worker.WindmillReaderIteratorBase.start(WindmillReaderIteratorBase.java:46)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:375)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:205)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1437)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1113)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834) Caused by:
java.io.EOFException: reached end of stream after reading 31 bytes; 52
bytes expected
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:780)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:762)
org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:60)
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100)
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90)
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37)
org.apache.beam.sdk.coders.RowCoderGenerator$DecodeInstruction.decodeDelegate(RowCoderGenerator.java:435)

On Tue, Jan 18, 2022 at 5:28 PM Reuven Lax <[email protected]> wrote:

> Yes please.
>
> On Tue, Jan 18, 2022 at 5:13 PM gaurav mishra <
> [email protected]> wrote:
>
>>
>>
>> On Tue, Jan 11, 2022 at 4:04 PM Reuven Lax <[email protected]> wrote:
>>
>>> Probably a bug - JavaFieldSchema does walk up the inheritance tree (see
>>> ReflectUtils.getFields)
>>>
>> Should I open a Jira for this?
>>
>>
>>>
>>> Do you have a github account? If so I can add you as a reviewer on this
>>> PR.
>>>
>>> On Tue, Jan 11, 2022 at 3:58 PM gaurav mishra <
>>> [email protected]> wrote:
>>>
>>>> That's a valid point. I can live with the current solution for now. And
>>>> hope that Dataflow will add that support soon.
>>>>
>>>> Another question regarding JavaBeanSchema implementation, I noticed we
>>>> are using clazz.getDeclaredMethods() for preparing the schema. This ignores
>>>> the base class's getters, is there a reason we are not using
>>>> clazz.getMethods() for preparing the schema.
>>>>
>>>> On Tue, Jan 11, 2022 at 3:24 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> What matters is the physical order of the fields in the schema
>>>>> encoding. If you add gaps, you'll be fine initially (since the code I 
>>>>> added
>>>>> simply sorts by the FieldNumber), however you'll have problems on update.
>>>>>
>>>>> For example, let's say you have the following
>>>>>
>>>>>    SchemaFieldNumber("0") String fieldOne;
>>>>>    SchemaFieldNumber("50") String fieldTwo;
>>>>>
>>>>> If you remove the check, things will appear just fine. However if you
>>>>> then add a new field number in that gap, you'll have problems:
>>>>>
>>>>>    SchemaFieldNumber("0") String fieldOne;
>>>>>    SchemaFieldNumber("1") String fieldThree;
>>>>>    SchemaFieldNumber("50") String fieldTwo;
>>>>>
>>>>> Now fieldThree will be expected to be the second field encoded, and
>>>>> you won't be able to update.
>>>>>
>>>>> Dataflow handles schema updates by keeping track of the full map of
>>>>> field name -> encoding position from the old job and telling the coders of
>>>>> the new job to maintain the old encoding positions; this forces all new
>>>>> fields to be added to the end of the encoding. I think the long-term
>>>>> solution here is for Dataflow to support this on state variables as well.
>>>>>
>>>>> Reuven
>>>>>
>>>>>
>>>>> On Tue, Jan 11, 2022 at 1:55 PM gaurav mishra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Reuven,
>>>>>> In your patch I just dropped the check for gaps in the numbers and
>>>>>> introduced gaps in the annotated field numbers in my beans. Generated
>>>>>> schema seems to be consistent and still works across pipeline updates.
>>>>>>
>>>>>> Preconditions.checkState(
>>>>>>              number == i,
>>>>>>              "Expected field number "
>>>>>>                  + i
>>>>>>                  + " for field: "
>>>>>>                  + type.getName()
>>>>>>                  + " instead got "
>>>>>>                  + number);
>>>>>>
>>>>>> Is there any reason why we are checking for gaps there?
>>>>>>
>>>>>> On Mon, Jan 10, 2022 at 5:37 PM gaurav mishra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jan 10, 2022 at 11:42 AM Reuven Lax <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Gaurav,
>>>>>>>>
>>>>>>>> Can you try this out and tell me if it works for you?
>>>>>>>> https://github.com/apache/beam/pull/16472
>>>>>>>>
>>>>>>>> This allows you to annotate all your getters with
>>>>>>>> SchemaFieldNumber, to specify the order of fields.
>>>>>>>>
>>>>>>> Hi Reuven
>>>>>>> I am able to build and try out your code and it seems to be working
>>>>>>> fine so far. I will continue to do more testing with this build. But I 
>>>>>>> do
>>>>>>> see a small issue with the current implementation. In the documentation 
>>>>>>> it
>>>>>>> says there are no gaps allowed in those numbers which makes it hard to 
>>>>>>> use
>>>>>>> this on classes which inherit from another class which uses this
>>>>>>> annotation.
>>>>>>> In my case I have an input Bean and an Output Bean that extends the
>>>>>>> Input bean. I have to configure Coders for both these Beans. Input Bean
>>>>>>> today uses number 0-40, and output Bean uses 41-46. If tomorrow I add a 
>>>>>>> new
>>>>>>> field to the input bean then I will have to use 47 on that field. to
>>>>>>> avoid duplicates in output Bean. The only solution around this problem I
>>>>>>> can think of is to allow for gaps in the number.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Mon, Jan 10, 2022 at 9:28 AM Reuven Lax <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes, timers will get carried over.
>>>>>>>>>
>>>>>>>>> On Mon, Jan 10, 2022 at 9:19 AM gaurav mishra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> That DelegateCoder seems like a good idea to keep the DoFn code
>>>>>>>>>> clean. In the short term I will perhaps use json string as my 
>>>>>>>>>> intermediary
>>>>>>>>>> and will let Jackson do the serialization and deserialization. from 
>>>>>>>>>> what I
>>>>>>>>>> have learned so far is that Dataflow does some magic in the 
>>>>>>>>>> background to
>>>>>>>>>> re-order fields to make the PCollection schema compatible to the 
>>>>>>>>>> previous
>>>>>>>>>> job (if I use SchemaCoder), So I don't have to make a switch from 
>>>>>>>>>> Java
>>>>>>>>>> beans yet. I can use the DelegateCoder for my statespec and continue
>>>>>>>>>> using SchemaCoder for PCollections.
>>>>>>>>>>
>>>>>>>>>> @Reuven
>>>>>>>>>> Regarding timers in stateful DoFn, do they get carried over to
>>>>>>>>>> the new job in Dataflow?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Gaurav,
>>>>>>>>>>>
>>>>>>>>>>> It looks like the issues related to state evolution is something
>>>>>>>>>>> quite recurrent in this mailing list, and do not seem to be covered 
>>>>>>>>>>> in the
>>>>>>>>>>> Beam docs, unfortunately.
>>>>>>>>>>>
>>>>>>>>>>> I am not familiar with Google Dataflow runner, but the way I
>>>>>>>>>>> achieved state evolution on Flink runner was a combo of:
>>>>>>>>>>> 1 - version-controlled Avro schemas for generating POJOs
>>>>>>>>>>> (models) which were used in stateful Beam DoFns
>>>>>>>>>>> 2 - a generic Avro-generated POJO as an intermediary - ( schema:
>>>>>>>>>>> string, payload: bytes )
>>>>>>>>>>> 3 -
>>>>>>>>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html
>>>>>>>>>>> responsible for converting a model into an intermediary (2)
>>>>>>>>>>>
>>>>>>>>>>> This allowed adding & dropping nullable fields and to do other
>>>>>>>>>>> Avro-forward compatible changes without loosing the state. The 
>>>>>>>>>>> order of
>>>>>>>>>>> fields also didn't matter in this setting. And this also meant 
>>>>>>>>>>> double ser-
>>>>>>>>>>> and deserialization, unfortunately. But I wasn't able to come to 
>>>>>>>>>>> any better
>>>>>>>>>>> solution, given that there seemed to be no public info available 
>>>>>>>>>>> about the
>>>>>>>>>>> topic, or, maybe, due to the lack of my experience.
>>>>>>>>>>>
>>>>>>>>>>> Relevant mail threads:
>>>>>>>>>>> https://www.mail-archive.com/[email protected]/msg07249.html
>>>>>>>>>>>
>>>>>>>>>>> Best Regards,
>>>>>>>>>>> Pavel Solomin
>>>>>>>>>>>
>>>>>>>>>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype:
>>>>>>>>>>> pavel_solomin | Linkedin
>>>>>>>>>>> <https://www.linkedin.com/in/pavelsolomin>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, 10 Jan 2022 at 08:43, gaurav mishra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Unfortunately, as you've discovered, Dataflow's update only
>>>>>>>>>>>>> pays attention to actual PCollections, and does not look at DoFn 
>>>>>>>>>>>>> state
>>>>>>>>>>>>> variables at all (historically, stateful DoFns did not yet exist 
>>>>>>>>>>>>> when
>>>>>>>>>>>>> Dataflow implemented its update support).
>>>>>>>>>>>>>
>>>>>>>>>>>>> You can file a feature request against Google to implement
>>>>>>>>>>>>> update on state variables, as this is honestly a major feature 
>>>>>>>>>>>>> gap. In the
>>>>>>>>>>>>> meantime there might be some workarounds you can use until this 
>>>>>>>>>>>>> support is
>>>>>>>>>>>>> added. e.g.
>>>>>>>>>>>>>    - you could convert the data into a protocol buffer before
>>>>>>>>>>>>> storing into state; ProtoCoder should handle new fields, and 
>>>>>>>>>>>>> fields in a
>>>>>>>>>>>>> protocol buffer have a deterministic order.
>>>>>>>>>>>>>
>>>>>>>>>>>> Is there an easy way to go from a java object to a protocol
>>>>>>>>>>>> buffer object. The only way I can think of is to first serialize 
>>>>>>>>>>>> the java
>>>>>>>>>>>> model to json and then deserialize the json into a proto buffer 
>>>>>>>>>>>> object.
>>>>>>>>>>>> Same for the other way around - protobuf to java object.  while I 
>>>>>>>>>>>> was
>>>>>>>>>>>> writing this I realized I can maybe store the json dump of my java 
>>>>>>>>>>>> object
>>>>>>>>>>>> in state spec. That should also work.
>>>>>>>>>>>>
>>>>>>>>>>>>>    - you could explicitly use the Row object, although this
>>>>>>>>>>>>> will again make your code more complex.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> Another intermediate option - we could fairly easily add
>>>>>>>>>>>>> a @SchemaFieldId annotation to Beam that would allow you to 
>>>>>>>>>>>>> specify the
>>>>>>>>>>>>> order of fields. You would have to annotate every getter, 
>>>>>>>>>>>>> something like
>>>>>>>>>>>>> this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> @ScheamFieldId(0)
>>>>>>>>>>>>> public T getV1();
>>>>>>>>>>>>>
>>>>>>>>>>>>> @SchemaFieldId(1)
>>>>>>>>>>>>> public T getV2();
>>>>>>>>>>>>>
>>>>>>>>>>>> Yes please. If this is an easy change and If you can point me
>>>>>>>>>>>> in the right direction I will be happy to submit a patch for this.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> etc.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> SerializableCoder can be tricky for updates, and can also be
>>>>>>>>>>>>>> very inefficient.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As you noticed, when inferring schemas from a Java class, the
>>>>>>>>>>>>>> order of fields is non deterministic (not much that we can do 
>>>>>>>>>>>>>> about this,
>>>>>>>>>>>>>> as Java doesn't return the fields in a deterministic order). For 
>>>>>>>>>>>>>> regular
>>>>>>>>>>>>>> PCollections, Dataflow (though currently only Dataflow) will 
>>>>>>>>>>>>>> allow update
>>>>>>>>>>>>>> even though the field orders have changed. Unfortunately
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I now have SchemaCoder for all data flowing through
>>>>>>>>>>>>>>> the pipeline.
>>>>>>>>>>>>>>> I am noticing another issue once a new pipeline is started
>>>>>>>>>>>>>>> with --update flag. In my stateful DoFn where I am doing 
>>>>>>>>>>>>>>> prevState.read(),
>>>>>>>>>>>>>>> exceptions are being thrown. errors like
>>>>>>>>>>>>>>> Caused by: java.io.EOFException: reached end of stream after
>>>>>>>>>>>>>>> reading 68 bytes; 101 bytes expected
>>>>>>>>>>>>>>> In this case there was no change in code or data model when
>>>>>>>>>>>>>>> relaunching the pipeline.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It seems that Schema for my Bean is changing on every run of
>>>>>>>>>>>>>>> the pipeline. I manually inspected the Schema returned by the
>>>>>>>>>>>>>>> SchemaRegistry by running the pipeline a few times locally and 
>>>>>>>>>>>>>>> each time I
>>>>>>>>>>>>>>> noticed the order of fields in the Schema was different. This 
>>>>>>>>>>>>>>> changing of
>>>>>>>>>>>>>>> order of fields is breaking the new pipeline. Is there a way to 
>>>>>>>>>>>>>>> enforce
>>>>>>>>>>>>>>> some kind of ordering on the fields?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Looking at source code for the JavaBeamSchema it seems I
>>>>>>>>>>>>>>>> will have to annotate my getter and setters with any flavour 
>>>>>>>>>>>>>>>> of @Nullable.
>>>>>>>>>>>>>>>> Another question -
>>>>>>>>>>>>>>>> does the dataflow update pipeline feature work only with
>>>>>>>>>>>>>>>> SchemaCoder? I have SerializableCoder and ProtoCoder being 
>>>>>>>>>>>>>>>> used in some of
>>>>>>>>>>>>>>>> my other pipelines. I wonder if I have to change those too to 
>>>>>>>>>>>>>>>> open doors
>>>>>>>>>>>>>>>> for updating pipelines there?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Some progress made.
>>>>>>>>>>>>>>>>> Now I can see that schema has all the needed fields. But
>>>>>>>>>>>>>>>>> all fields in the returned schema are marked as "NOT NULL". 
>>>>>>>>>>>>>>>>> even though the
>>>>>>>>>>>>>>>>> fields are annotated with @javax.annotation.Nullable.
>>>>>>>>>>>>>>>>> Is there a different flavor of @Nullable I need to use on
>>>>>>>>>>>>>>>>> my fields?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Ah, this isn't a POJO then. In this case, try using
>>>>>>>>>>>>>>>>>> DefaultSchema(JavaBeanSchema.class)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> It looks something like this. It currently has around 40
>>>>>>>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>>>>>>>>>>>>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>>>>>>>>>> @EqualsAndHashCode
>>>>>>>>>>>>>>>>>>> @ToString
>>>>>>>>>>>>>>>>>>> public class POJO implements SomeInterface, Serializable
>>>>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     public static final Integer SOME_CONSTANT_FIELD = 2;
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>>>>>>>>     private String field1;
>>>>>>>>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>>>>>>>>     private String field2;
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     @Nullable
>>>>>>>>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>>>>>>>>     private Integer field3;
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     .....
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>>>>>>>>     public String getField1() {
>>>>>>>>>>>>>>>>>>>         return field1;
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>>>>>>>>     public void setField1(String field1) {
>>>>>>>>>>>>>>>>>>>         this.field1 = field1;
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>>>>>>>>     public String getField2() {
>>>>>>>>>>>>>>>>>>>         return field2;
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>>>>>>>>     public void setField2(String field2) {
>>>>>>>>>>>>>>>>>>>         this.field2 = field2;
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>>>>>>>>     public Integer getField3() {
>>>>>>>>>>>>>>>>>>>         return field3;
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>>>>>>>>     public void setField3(Integer field3) {
>>>>>>>>>>>>>>>>>>>         this.field3 = field3;
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     public POJO() {
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>          0 arg empty constructor
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>     public POJO(POJO other) {
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>        ... copy constructor
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Can you paste the code for your Pojo?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning
>>>>>>>>>>>>>>>>>>>>> a empty schema.
>>>>>>>>>>>>>>>>>>>>> my Pojo is annotated with
>>>>>>>>>>>>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>>>>>>>>>>>> Is there something extra I need to do here to register
>>>>>>>>>>>>>>>>>>>>> my class with a schema registry.
>>>>>>>>>>>>>>>>>>>>> Note: the code which is building the pipeline is
>>>>>>>>>>>>>>>>>>>>> sitting in a library (Different package) which is being 
>>>>>>>>>>>>>>>>>>>>> imported into my
>>>>>>>>>>>>>>>>>>>>> pipeline code. So perhaps there is some configuration 
>>>>>>>>>>>>>>>>>>>>> which is missing
>>>>>>>>>>>>>>>>>>>>> which allows the framework to discover my Pojo and the 
>>>>>>>>>>>>>>>>>>>>> annotations
>>>>>>>>>>>>>>>>>>>>> associated with it.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I think I can make it work now. I found a utility
>>>>>>>>>>>>>>>>>>>>>>>> method for building my coder from class
>>>>>>>>>>>>>>>>>>>>>>>> Something like
>>>>>>>>>>>>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>>>>>>>>>>>>>>>>>>>>> Coder<Data> dataCoder =
>>>>>>>>>>>>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>>>>>>>>>>>>>>>>>>>>                 TypeDescriptor.of(dataClass),
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass),
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass));
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> This will work. Though, did annotating the POJO like
>>>>>>>>>>>>>>>>>>>>>>> I said not work?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>  No, annotation alone does not work since I am not
>>>>>>>>>>>>>>>>>>>>>> using concrete classes in the code where the pipeline is 
>>>>>>>>>>>>>>>>>>>>>> being constructed.
>>>>>>>>>>>>>>>>>>>>>> <Data> above is a template variable in the class which 
>>>>>>>>>>>>>>>>>>>>>> is constructing the
>>>>>>>>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> removing setCoder call breaks my pipeline.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> No Coder has been manually specified;  you may do
>>>>>>>>>>>>>>>>>>>>>>>>> so using .setCoder().
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>   Inferring a Coder from the CoderRegistry
>>>>>>>>>>>>>>>>>>>>>>>>> failed: Unable to provide a Coder for Data.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>   Building a Coder using a registered
>>>>>>>>>>>>>>>>>>>>>>>>> CoderProvider failed.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Reason being the code which is building the
>>>>>>>>>>>>>>>>>>>>>>>>> pipeline is based on Java Generics. Actual pipeline 
>>>>>>>>>>>>>>>>>>>>>>>>> building code sets a
>>>>>>>>>>>>>>>>>>>>>>>>> bunch of parameters which are used to construct the 
>>>>>>>>>>>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> stream =
>>>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>>>>>>>>>>>>>>>>>>>>> So I guess I will need to provide some more
>>>>>>>>>>>>>>>>>>>>>>>>> information to the framework to make the annotation 
>>>>>>>>>>>>>>>>>>>>>>>>> work.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> If you annotate your POJO
>>>>>>>>>>>>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that 
>>>>>>>>>>>>>>>>>>>>>>>>>> will usually automatically
>>>>>>>>>>>>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the 
>>>>>>>>>>>>>>>>>>>>>>>>>> setCoder call).
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema
>>>>>>>>>>>>>>>>>>>>>>>>>>> encoding.
>>>>>>>>>>>>>>>>>>>>>>>>>>> In my current code I am doing something like this
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> =
>>>>>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro
>>>>>>>>>>>>>>>>>>>>>>>>>>>> coder. Can you use Beam's schema encoding instead?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if incompatible changes
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are made to Pojo.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> POJO before and after the change to ensure that 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> they are compatible as you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expect.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but asking here in hopes someone has faced a 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> similar problem and can help.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> update a running Dataflow job. I am noticing 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that compatibility checks fail
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any time I add a new field to my data model. 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Error says
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serialize the model. I read somewhere that 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding new optional fields to the data should 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> work when updating the pipeline.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of the model to something which 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allows me to update the pipeline in cases when 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I add new optional fields to existing model. 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Any suggestions?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Reply via email to