On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> wrote:

> Unfortunately, as you've discovered, Dataflow's update only pays attention
> to actual PCollections, and does not look at DoFn state variables at all
> (historically, stateful DoFns did not yet exist when Dataflow implemented
> its update support).
>
> You can file a feature request against Google to implement update on state
> variables, as this is honestly a major feature gap. In the meantime there
> might be some workarounds you can use until this support is added. e.g.
>    - you could convert the data into a protocol buffer before storing into
> state; ProtoCoder should handle new fields, and fields in a protocol buffer
> have a deterministic order.
>
Is there an easy way to go from a java object to a protocol buffer
object. The only way I can think of is to first serialize the java model to
json and then deserialize the json into a proto buffer object.  Same for
the other way around - protobuf to java object.  while I was writing this I
realized I can maybe store the json dump of my java object in state spec.
That should also work.

>    - you could explicitly use the Row object, although this will again
> make your code more complex.
>

> Another intermediate option - we could fairly easily add a @SchemaFieldId
> annotation to Beam that would allow you to specify the order of fields. You
> would have to annotate every getter, something like this:
>
> @ScheamFieldId(0)
> public T getV1();
>
> @SchemaFieldId(1)
> public T getV2();
>
Yes please. If this is an easy change and If you can point me in the right
direction I will be happy to submit a patch for this.



>
> etc.
>
>
> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> wrote:
>
>> SerializableCoder can be tricky for updates, and can also be very
>> inefficient.
>>
>> As you noticed, when inferring schemas from a Java class, the order of
>> fields is non deterministic (not much that we can do about this, as Java
>> doesn't return the fields in a deterministic order). For regular
>> PCollections, Dataflow (though currently only Dataflow) will allow update
>> even though the field orders have changed. Unfortunately
>>
>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra <
>> [email protected]> wrote:
>>
>>> I now have SchemaCoder for all data flowing through the pipeline.
>>> I am noticing another issue once a new pipeline is started with --update
>>> flag. In my stateful DoFn where I am doing prevState.read(), exceptions are
>>> being thrown. errors like
>>> Caused by: java.io.EOFException: reached end of stream after reading 68
>>> bytes; 101 bytes expected
>>> In this case there was no change in code or data model when relaunching
>>> the pipeline.
>>>
>>> It seems that Schema for my Bean is changing on every run of the
>>> pipeline. I manually inspected the Schema returned by the SchemaRegistry by
>>> running the pipeline a few times locally and each time I noticed the order
>>> of fields in the Schema was different. This changing of order of fields is
>>> breaking the new pipeline. Is there a way to enforce some kind of ordering
>>> on the fields?
>>>
>>>
>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra <
>>> [email protected]> wrote:
>>>
>>>> Looking at source code for the JavaBeamSchema it seems I will have to
>>>> annotate my getter and setters with any flavour of @Nullable.
>>>> Another question -
>>>> does the dataflow update pipeline feature work only with SchemaCoder? I
>>>> have SerializableCoder and ProtoCoder being used in some of my other
>>>> pipelines. I wonder if I have to change those too to open doors for
>>>> updating pipelines there?
>>>>
>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <
>>>> [email protected]> wrote:
>>>>
>>>>> Some progress made.
>>>>> Now I can see that schema has all the needed fields. But all fields in
>>>>> the returned schema are marked as "NOT NULL". even though the fields are
>>>>> annotated with @javax.annotation.Nullable.
>>>>> Is there a different flavor of @Nullable I need to use on my fields?
>>>>>
>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Ah, this isn't a POJO then. In this case, try using
>>>>>> DefaultSchema(JavaBeanSchema.class)
>>>>>>
>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> It looks something like this. It currently has around 40 fields.
>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>> @EqualsAndHashCode
>>>>>>> @ToString
>>>>>>> public class POJO implements SomeInterface, Serializable {
>>>>>>>
>>>>>>>     public static final Integer SOME_CONSTANT_FIELD = 2;
>>>>>>>
>>>>>>>     @JsonProperty("field_1")
>>>>>>>     private String field1;
>>>>>>>     @JsonProperty("field_2")
>>>>>>>     private String field2;
>>>>>>>
>>>>>>>     @Nullable
>>>>>>>     @JsonProperty("field_3")
>>>>>>>     private Integer field3;
>>>>>>>
>>>>>>>     .....
>>>>>>>
>>>>>>>     @JsonProperty("field_1")
>>>>>>>     public String getField1() {
>>>>>>>         return field1;
>>>>>>>     }
>>>>>>>
>>>>>>>     @JsonProperty("field_1")
>>>>>>>     public void setField1(String field1) {
>>>>>>>         this.field1 = field1;
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>     @JsonProperty("field_2")
>>>>>>>     public String getField2() {
>>>>>>>         return field2;
>>>>>>>     }
>>>>>>>
>>>>>>>     @JsonProperty("field_2")
>>>>>>>     public void setField2(String field2) {
>>>>>>>         this.field2 = field2;
>>>>>>>     }
>>>>>>>
>>>>>>>     @JsonProperty("field_3")
>>>>>>>     public Integer getField3() {
>>>>>>>         return field3;
>>>>>>>     }
>>>>>>>
>>>>>>>     @JsonProperty("field_3")
>>>>>>>     public void setField3(Integer field3) {
>>>>>>>         this.field3 = field3;
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>     public POJO() {
>>>>>>>
>>>>>>>          0 arg empty constructor
>>>>>>>     }
>>>>>>>     public POJO(POJO other) {
>>>>>>>
>>>>>>>        ... copy constructor
>>>>>>>     }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]> wrote:
>>>>>>>
>>>>>>>> Can you paste the code for your Pojo?
>>>>>>>>
>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a
>>>>>>>>> empty schema.
>>>>>>>>> my Pojo is annotated with the @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>> Is there something extra I need to do here to register my class
>>>>>>>>> with a schema registry.
>>>>>>>>> Note: the code which is building the pipeline is sitting in a
>>>>>>>>> library (Different package) which is being imported into my pipeline 
>>>>>>>>> code.
>>>>>>>>> So perhaps there is some configuration which is missing which allows 
>>>>>>>>> the
>>>>>>>>> framework to discover my Pojo and the annotations associated with it.
>>>>>>>>>
>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I think I can make it work now. I found a utility method for
>>>>>>>>>>>> building my coder from class
>>>>>>>>>>>> Something like
>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>>>>>>>>> Coder<Data> dataCoder =
>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>>>>>>>>                 TypeDescriptor.of(dataClass),
>>>>>>>>>>>>                 schemaRegistry.getToRowFunction(dataClass),
>>>>>>>>>>>>                 schemaRegistry.getFromRowFunction(dataClass));
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This will work. Though, did annotating the POJO like I said not
>>>>>>>>>>> work?
>>>>>>>>>>>
>>>>>>>>>>  No, annotation alone does not work since I am not using concrete
>>>>>>>>>> classes in the code where the pipeline is being constructed. <Data> 
>>>>>>>>>> above
>>>>>>>>>> is a template variable in the class which is constructing the 
>>>>>>>>>> pipeline.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> removing setCoder call breaks my pipeline.
>>>>>>>>>>>>>
>>>>>>>>>>>>> No Coder has been manually specified;  you may do so using
>>>>>>>>>>>>> .setCoder().
>>>>>>>>>>>>>
>>>>>>>>>>>>>   Inferring a Coder from the CoderRegistry failed: Unable to
>>>>>>>>>>>>> provide a Coder for Data.
>>>>>>>>>>>>>
>>>>>>>>>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Reason being the code which is building the pipeline is based
>>>>>>>>>>>>> on Java Generics. Actual pipeline building code sets a bunch of 
>>>>>>>>>>>>> parameters
>>>>>>>>>>>>> which are used to construct the pipeline.
>>>>>>>>>>>>> PCollection<Data> stream =
>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>>>>>>>>> So I guess I will need to provide some more information to the
>>>>>>>>>>>>> framework to make the annotation work.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> If you annotate your POJO
>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will usually 
>>>>>>>>>>>>>> automatically
>>>>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder 
>>>>>>>>>>>>>> call).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding.
>>>>>>>>>>>>>>> In my current code I am doing something like this
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> PCollection<Data> =
>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <[email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder. Can
>>>>>>>>>>>>>>>> you use Beam's schema encoding instead?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is there a way to programmatically check for
>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests if 
>>>>>>>>>>>>>>>>> incompatible changes
>>>>>>>>>>>>>>>>> are made to Pojo.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <[email protected]>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO before
>>>>>>>>>>>>>>>>>> and after the change to ensure that they are compatible as 
>>>>>>>>>>>>>>>>>> you expect.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but asking
>>>>>>>>>>>>>>>>>>> here in hopes someone has faced a similar problem and can 
>>>>>>>>>>>>>>>>>>> help.
>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a running
>>>>>>>>>>>>>>>>>>> Dataflow job. I am noticing that compatibility checks fail 
>>>>>>>>>>>>>>>>>>> any time I add a
>>>>>>>>>>>>>>>>>>> new field to my data model. Error says
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to serialize 
>>>>>>>>>>>>>>>>>>> the model. I read somewhere that adding new optional fields 
>>>>>>>>>>>>>>>>>>> to the data should work when updating the pipeline.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of the 
>>>>>>>>>>>>>>>>>>> model to something which allows me to update the pipeline 
>>>>>>>>>>>>>>>>>>> in cases when I add new optional fields to existing model. 
>>>>>>>>>>>>>>>>>>> Any suggestions?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>

Reply via email to