Yes.
username - gauravmishraitbhu

On Tue, Jan 11, 2022 at 4:04 PM Reuven Lax <[email protected]> wrote:

> Probably a bug - JavaFieldSchema does walk up the inheritance tree (see
> ReflectUtils.getFields)
>
> Do you have a github account? If so I can add you as a reviewer on this PR.
>
> On Tue, Jan 11, 2022 at 3:58 PM gaurav mishra <
> [email protected]> wrote:
>
>> That's a valid point. I can live with the current solution for now. And
>> hope that Dataflow will add that support soon.
>>
>> Another question regarding JavaBeanSchema implementation, I noticed we
>> are using clazz.getDeclaredMethods() for preparing the schema. This ignores
>> the base class's getters, is there a reason we are not using
>> clazz.getMethods() for preparing the schema.
>>
>> On Tue, Jan 11, 2022 at 3:24 PM Reuven Lax <[email protected]> wrote:
>>
>>> What matters is the physical order of the fields in the schema encoding.
>>> If you add gaps, you'll be fine initially (since the code I added simply
>>> sorts by the FieldNumber), however you'll have problems on update.
>>>
>>> For example, let's say you have the following
>>>
>>>    SchemaFieldNumber("0") String fieldOne;
>>>    SchemaFieldNumber("50") String fieldTwo;
>>>
>>> If you remove the check, things will appear just fine. However if you
>>> then add a new field number in that gap, you'll have problems:
>>>
>>>    SchemaFieldNumber("0") String fieldOne;
>>>    SchemaFieldNumber("1") String fieldThree;
>>>    SchemaFieldNumber("50") String fieldTwo;
>>>
>>> Now fieldThree will be expected to be the second field encoded, and you
>>> won't be able to update.
>>>
>>> Dataflow handles schema updates by keeping track of the full map of
>>> field name -> encoding position from the old job and telling the coders of
>>> the new job to maintain the old encoding positions; this forces all new
>>> fields to be added to the end of the encoding. I think the long-term
>>> solution here is for Dataflow to support this on state variables as well.
>>>
>>> Reuven
>>>
>>>
>>> On Tue, Jan 11, 2022 at 1:55 PM gaurav mishra <
>>> [email protected]> wrote:
>>>
>>>> Hi Reuven,
>>>> In your patch I just dropped the check for gaps in the numbers and
>>>> introduced gaps in the annotated field numbers in my beans. Generated
>>>> schema seems to be consistent and still works across pipeline updates.
>>>>
>>>> Preconditions.checkState(
>>>>              number == i,
>>>>              "Expected field number "
>>>>                  + i
>>>>                  + " for field: "
>>>>                  + type.getName()
>>>>                  + " instead got "
>>>>                  + number);
>>>>
>>>> Is there any reason why we are checking for gaps there?
>>>>
>>>> On Mon, Jan 10, 2022 at 5:37 PM gaurav mishra <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Mon, Jan 10, 2022 at 11:42 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Gaurav,
>>>>>>
>>>>>> Can you try this out and tell me if it works for you?
>>>>>> https://github.com/apache/beam/pull/16472
>>>>>>
>>>>>> This allows you to annotate all your getters with SchemaFieldNumber,
>>>>>> to specify the order of fields.
>>>>>>
>>>>> Hi Reuven
>>>>> I am able to build and try out your code and it seems to be working
>>>>> fine so far. I will continue to do more testing with this build. But I do
>>>>> see a small issue with the current implementation. In the documentation it
>>>>> says there are no gaps allowed in those numbers which makes it hard to use
>>>>> this on classes which inherit from another class which uses this
>>>>> annotation.
>>>>> In my case I have an input Bean and an Output Bean that extends the
>>>>> Input bean. I have to configure Coders for both these Beans. Input Bean
>>>>> today uses number 0-40, and output Bean uses 41-46. If tomorrow I add a 
>>>>> new
>>>>> field to the input bean then I will have to use 47 on that field. to
>>>>> avoid duplicates in output Bean. The only solution around this problem I
>>>>> can think of is to allow for gaps in the number.
>>>>>
>>>>>
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Mon, Jan 10, 2022 at 9:28 AM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> Yes, timers will get carried over.
>>>>>>>
>>>>>>> On Mon, Jan 10, 2022 at 9:19 AM gaurav mishra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> That DelegateCoder seems like a good idea to keep the DoFn code
>>>>>>>> clean. In the short term I will perhaps use json string as my 
>>>>>>>> intermediary
>>>>>>>> and will let Jackson do the serialization and deserialization. from 
>>>>>>>> what I
>>>>>>>> have learned so far is that Dataflow does some magic in the background 
>>>>>>>> to
>>>>>>>> re-order fields to make the PCollection schema compatible to the 
>>>>>>>> previous
>>>>>>>> job (if I use SchemaCoder), So I don't have to make a switch from Java
>>>>>>>> beans yet. I can use the DelegateCoder for my statespec and continue
>>>>>>>> using SchemaCoder for PCollections.
>>>>>>>>
>>>>>>>> @Reuven
>>>>>>>> Regarding timers in stateful DoFn, do they get carried over to the
>>>>>>>> new job in Dataflow?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hello Gaurav,
>>>>>>>>>
>>>>>>>>> It looks like the issues related to state evolution is something
>>>>>>>>> quite recurrent in this mailing list, and do not seem to be covered 
>>>>>>>>> in the
>>>>>>>>> Beam docs, unfortunately.
>>>>>>>>>
>>>>>>>>> I am not familiar with Google Dataflow runner, but the way I
>>>>>>>>> achieved state evolution on Flink runner was a combo of:
>>>>>>>>> 1 - version-controlled Avro schemas for generating POJOs (models)
>>>>>>>>> which were used in stateful Beam DoFns
>>>>>>>>> 2 - a generic Avro-generated POJO as an intermediary - ( schema:
>>>>>>>>> string, payload: bytes )
>>>>>>>>> 3 -
>>>>>>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html
>>>>>>>>> responsible for converting a model into an intermediary (2)
>>>>>>>>>
>>>>>>>>> This allowed adding & dropping nullable fields and to do other
>>>>>>>>> Avro-forward compatible changes without loosing the state. The order 
>>>>>>>>> of
>>>>>>>>> fields also didn't matter in this setting. And this also meant double 
>>>>>>>>> ser-
>>>>>>>>> and deserialization, unfortunately. But I wasn't able to come to any 
>>>>>>>>> better
>>>>>>>>> solution, given that there seemed to be no public info available 
>>>>>>>>> about the
>>>>>>>>> topic, or, maybe, due to the lack of my experience.
>>>>>>>>>
>>>>>>>>> Relevant mail threads:
>>>>>>>>> https://www.mail-archive.com/[email protected]/msg07249.html
>>>>>>>>>
>>>>>>>>> Best Regards,
>>>>>>>>> Pavel Solomin
>>>>>>>>>
>>>>>>>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype:
>>>>>>>>> pavel_solomin | Linkedin
>>>>>>>>> <https://www.linkedin.com/in/pavelsolomin>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, 10 Jan 2022 at 08:43, gaurav mishra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Unfortunately, as you've discovered, Dataflow's update only pays
>>>>>>>>>>> attention to actual PCollections, and does not look at DoFn state 
>>>>>>>>>>> variables
>>>>>>>>>>> at all (historically, stateful DoFns did not yet exist when Dataflow
>>>>>>>>>>> implemented its update support).
>>>>>>>>>>>
>>>>>>>>>>> You can file a feature request against Google to implement
>>>>>>>>>>> update on state variables, as this is honestly a major feature gap. 
>>>>>>>>>>> In the
>>>>>>>>>>> meantime there might be some workarounds you can use until this 
>>>>>>>>>>> support is
>>>>>>>>>>> added. e.g.
>>>>>>>>>>>    - you could convert the data into a protocol buffer before
>>>>>>>>>>> storing into state; ProtoCoder should handle new fields, and fields 
>>>>>>>>>>> in a
>>>>>>>>>>> protocol buffer have a deterministic order.
>>>>>>>>>>>
>>>>>>>>>> Is there an easy way to go from a java object to a protocol
>>>>>>>>>> buffer object. The only way I can think of is to first serialize the 
>>>>>>>>>> java
>>>>>>>>>> model to json and then deserialize the json into a proto buffer 
>>>>>>>>>> object.
>>>>>>>>>> Same for the other way around - protobuf to java object.  while I was
>>>>>>>>>> writing this I realized I can maybe store the json dump of my java 
>>>>>>>>>> object
>>>>>>>>>> in state spec. That should also work.
>>>>>>>>>>
>>>>>>>>>>>    - you could explicitly use the Row object, although this will
>>>>>>>>>>> again make your code more complex.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Another intermediate option - we could fairly easily add
>>>>>>>>>>> a @SchemaFieldId annotation to Beam that would allow you to specify 
>>>>>>>>>>> the
>>>>>>>>>>> order of fields. You would have to annotate every getter, something 
>>>>>>>>>>> like
>>>>>>>>>>> this:
>>>>>>>>>>>
>>>>>>>>>>> @ScheamFieldId(0)
>>>>>>>>>>> public T getV1();
>>>>>>>>>>>
>>>>>>>>>>> @SchemaFieldId(1)
>>>>>>>>>>> public T getV2();
>>>>>>>>>>>
>>>>>>>>>> Yes please. If this is an easy change and If you can point me in
>>>>>>>>>> the right direction I will be happy to submit a patch for this.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> etc.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> SerializableCoder can be tricky for updates, and can also be
>>>>>>>>>>>> very inefficient.
>>>>>>>>>>>>
>>>>>>>>>>>> As you noticed, when inferring schemas from a Java class, the
>>>>>>>>>>>> order of fields is non deterministic (not much that we can do 
>>>>>>>>>>>> about this,
>>>>>>>>>>>> as Java doesn't return the fields in a deterministic order). For 
>>>>>>>>>>>> regular
>>>>>>>>>>>> PCollections, Dataflow (though currently only Dataflow) will allow 
>>>>>>>>>>>> update
>>>>>>>>>>>> even though the field orders have changed. Unfortunately
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I now have SchemaCoder for all data flowing through
>>>>>>>>>>>>> the pipeline.
>>>>>>>>>>>>> I am noticing another issue once a new pipeline is started
>>>>>>>>>>>>> with --update flag. In my stateful DoFn where I am doing 
>>>>>>>>>>>>> prevState.read(),
>>>>>>>>>>>>> exceptions are being thrown. errors like
>>>>>>>>>>>>> Caused by: java.io.EOFException: reached end of stream after
>>>>>>>>>>>>> reading 68 bytes; 101 bytes expected
>>>>>>>>>>>>> In this case there was no change in code or data model when
>>>>>>>>>>>>> relaunching the pipeline.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It seems that Schema for my Bean is changing on every run of
>>>>>>>>>>>>> the pipeline. I manually inspected the Schema returned by the
>>>>>>>>>>>>> SchemaRegistry by running the pipeline a few times locally and 
>>>>>>>>>>>>> each time I
>>>>>>>>>>>>> noticed the order of fields in the Schema was different. This 
>>>>>>>>>>>>> changing of
>>>>>>>>>>>>> order of fields is breaking the new pipeline. Is there a way to 
>>>>>>>>>>>>> enforce
>>>>>>>>>>>>> some kind of ordering on the fields?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Looking at source code for the JavaBeamSchema it seems I will
>>>>>>>>>>>>>> have to annotate my getter and setters with any flavour of 
>>>>>>>>>>>>>> @Nullable.
>>>>>>>>>>>>>> Another question -
>>>>>>>>>>>>>> does the dataflow update pipeline feature work only with
>>>>>>>>>>>>>> SchemaCoder? I have SerializableCoder and ProtoCoder being used 
>>>>>>>>>>>>>> in some of
>>>>>>>>>>>>>> my other pipelines. I wonder if I have to change those too to 
>>>>>>>>>>>>>> open doors
>>>>>>>>>>>>>> for updating pipelines there?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Some progress made.
>>>>>>>>>>>>>>> Now I can see that schema has all the needed fields. But all
>>>>>>>>>>>>>>> fields in the returned schema are marked as "NOT NULL". even 
>>>>>>>>>>>>>>> though the
>>>>>>>>>>>>>>> fields are annotated with @javax.annotation.Nullable.
>>>>>>>>>>>>>>> Is there a different flavor of @Nullable I need to use on my
>>>>>>>>>>>>>>> fields?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Ah, this isn't a POJO then. In this case, try using
>>>>>>>>>>>>>>>> DefaultSchema(JavaBeanSchema.class)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It looks something like this. It currently has around 40
>>>>>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>>>>>>>>>>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>>>>>>>> @EqualsAndHashCode
>>>>>>>>>>>>>>>>> @ToString
>>>>>>>>>>>>>>>>> public class POJO implements SomeInterface, Serializable {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     public static final Integer SOME_CONSTANT_FIELD = 2;
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>>>>>>     private String field1;
>>>>>>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>>>>>>     private String field2;
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @Nullable
>>>>>>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>>>>>>     private Integer field3;
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     .....
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>>>>>>     public String getField1() {
>>>>>>>>>>>>>>>>>         return field1;
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>>>>>>     public void setField1(String field1) {
>>>>>>>>>>>>>>>>>         this.field1 = field1;
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>>>>>>     public String getField2() {
>>>>>>>>>>>>>>>>>         return field2;
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>>>>>>     public void setField2(String field2) {
>>>>>>>>>>>>>>>>>         this.field2 = field2;
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>>>>>>     public Integer getField3() {
>>>>>>>>>>>>>>>>>         return field3;
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>>>>>>     public void setField3(Integer field3) {
>>>>>>>>>>>>>>>>>         this.field3 = field3;
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     public POJO() {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          0 arg empty constructor
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>     public POJO(POJO other) {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>        ... copy constructor
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Can you paste the code for your Pojo?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a
>>>>>>>>>>>>>>>>>>> empty schema.
>>>>>>>>>>>>>>>>>>> my Pojo is annotated with
>>>>>>>>>>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>>>>>>>>>> Is there something extra I need to do here to register
>>>>>>>>>>>>>>>>>>> my class with a schema registry.
>>>>>>>>>>>>>>>>>>> Note: the code which is building the pipeline is sitting
>>>>>>>>>>>>>>>>>>> in a library (Different package) which is being imported 
>>>>>>>>>>>>>>>>>>> into my pipeline
>>>>>>>>>>>>>>>>>>> code. So perhaps there is some configuration which is 
>>>>>>>>>>>>>>>>>>> missing which allows
>>>>>>>>>>>>>>>>>>> the framework to discover my Pojo and the annotations 
>>>>>>>>>>>>>>>>>>> associated with it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I think I can make it work now. I found a utility
>>>>>>>>>>>>>>>>>>>>>> method for building my coder from class
>>>>>>>>>>>>>>>>>>>>>> Something like
>>>>>>>>>>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>>>>>>>>>>>>>>>>>>> Coder<Data> dataCoder =
>>>>>>>>>>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>>>>>>>>>>>>>>>>>>                 TypeDescriptor.of(dataClass),
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getToRowFunction(dataClass),
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass));
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This will work. Though, did annotating the POJO like I
>>>>>>>>>>>>>>>>>>>>> said not work?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>  No, annotation alone does not work since I am not
>>>>>>>>>>>>>>>>>>>> using concrete classes in the code where the pipeline is 
>>>>>>>>>>>>>>>>>>>> being constructed.
>>>>>>>>>>>>>>>>>>>> <Data> above is a template variable in the class which is 
>>>>>>>>>>>>>>>>>>>> constructing the
>>>>>>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> removing setCoder call breaks my pipeline.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> No Coder has been manually specified;  you may do
>>>>>>>>>>>>>>>>>>>>>>> so using .setCoder().
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>   Inferring a Coder from the CoderRegistry failed:
>>>>>>>>>>>>>>>>>>>>>>> Unable to provide a Coder for Data.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>   Building a Coder using a registered CoderProvider
>>>>>>>>>>>>>>>>>>>>>>> failed.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Reason being the code which is building the pipeline
>>>>>>>>>>>>>>>>>>>>>>> is based on Java Generics. Actual pipeline building 
>>>>>>>>>>>>>>>>>>>>>>> code sets a bunch of
>>>>>>>>>>>>>>>>>>>>>>> parameters which are used to construct the pipeline.
>>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> stream =
>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>>>>>>>>>>>>>>>>>>> So I guess I will need to provide some more
>>>>>>>>>>>>>>>>>>>>>>> information to the framework to make the annotation 
>>>>>>>>>>>>>>>>>>>>>>> work.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> If you annotate your POJO
>>>>>>>>>>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will 
>>>>>>>>>>>>>>>>>>>>>>>> usually automatically
>>>>>>>>>>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the 
>>>>>>>>>>>>>>>>>>>>>>>> setCoder call).
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema
>>>>>>>>>>>>>>>>>>>>>>>>> encoding.
>>>>>>>>>>>>>>>>>>>>>>>>> In my current code I am doing something like this
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> PCollection<Data> =
>>>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro
>>>>>>>>>>>>>>>>>>>>>>>>>> coder. Can you use Beam's schema encoding instead?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for
>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests 
>>>>>>>>>>>>>>>>>>>>>>>>>>> if incompatible changes
>>>>>>>>>>>>>>>>>>>>>>>>>>> are made to Pojo.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <
>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> POJO before and after the change to ensure that 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> they are compatible as you
>>>>>>>>>>>>>>>>>>>>>>>>>>>> expect.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but asking here in hopes someone has faced a 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> similar problem and can help.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a running Dataflow job. I am noticing that 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility checks fail any
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> time I add a new field to my data model. Error 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> says
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serialize the model. I read somewhere that adding 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> new optional fields to the data should work when 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> updating the pipeline.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of the model to something which 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allows me to update the pipeline in cases when I 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> add new optional fields to existing model. Any 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Reply via email to