Hi Reuven,
can you please point me to some instructions regarding how to publish
artifacts to local maven.
`./gradlew publishToMavenLocal` seems to be not working. I don't see
the 2.37.0-SNAPSHOT in my local maven.

On Mon, Jan 10, 2022 at 11:42 AM Reuven Lax <[email protected]> wrote:

> Gaurav,
>
> Can you try this out and tell me if it works for you?
> https://github.com/apache/beam/pull/16472
>
> This allows you to annotate all your getters with SchemaFieldNumber, to
> specify the order of fields.
>
> Reuven
>
> On Mon, Jan 10, 2022 at 9:28 AM Reuven Lax <[email protected]> wrote:
>
>> Yes, timers will get carried over.
>>
>> On Mon, Jan 10, 2022 at 9:19 AM gaurav mishra <
>> [email protected]> wrote:
>>
>>> That DelegateCoder seems like a good idea to keep the DoFn code clean.
>>> In the short term I will perhaps use json string as my intermediary and
>>> will let Jackson do the serialization and deserialization. from what I have
>>> learned so far is that Dataflow does some magic in the background to
>>> re-order fields to make the PCollection schema compatible to the previous
>>> job (if I use SchemaCoder), So I don't have to make a switch from Java
>>> beans yet. I can use the DelegateCoder for my statespec and continue
>>> using SchemaCoder for PCollections.
>>>
>>> @Reuven
>>> Regarding timers in stateful DoFn, do they get carried over to the new
>>> job in Dataflow?
>>>
>>>
>>>
>>>
>>> On Mon, Jan 10, 2022 at 3:44 AM Pavel Solomin <[email protected]>
>>> wrote:
>>>
>>>> Hello Gaurav,
>>>>
>>>> It looks like the issues related to state evolution is something quite
>>>> recurrent in this mailing list, and do not seem to be covered in the Beam
>>>> docs, unfortunately.
>>>>
>>>> I am not familiar with Google Dataflow runner, but the way I achieved
>>>> state evolution on Flink runner was a combo of:
>>>> 1 - version-controlled Avro schemas for generating POJOs (models) which
>>>> were used in stateful Beam DoFns
>>>> 2 - a generic Avro-generated POJO as an intermediary - ( schema:
>>>> string, payload: bytes )
>>>> 3 -
>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/coders/DelegateCoder.html
>>>> responsible for converting a model into an intermediary (2)
>>>>
>>>> This allowed adding & dropping nullable fields and to do other
>>>> Avro-forward compatible changes without loosing the state. The order of
>>>> fields also didn't matter in this setting. And this also meant double ser-
>>>> and deserialization, unfortunately. But I wasn't able to come to any better
>>>> solution, given that there seemed to be no public info available about the
>>>> topic, or, maybe, due to the lack of my experience.
>>>>
>>>> Relevant mail threads:
>>>> https://www.mail-archive.com/[email protected]/msg07249.html
>>>>
>>>> Best Regards,
>>>> Pavel Solomin
>>>>
>>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin
>>>> | Linkedin <https://www.linkedin.com/in/pavelsolomin>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 10 Jan 2022 at 08:43, gaurav mishra <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Mon, Jan 10, 2022 at 12:19 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Unfortunately, as you've discovered, Dataflow's update only pays
>>>>>> attention to actual PCollections, and does not look at DoFn state 
>>>>>> variables
>>>>>> at all (historically, stateful DoFns did not yet exist when Dataflow
>>>>>> implemented its update support).
>>>>>>
>>>>>> You can file a feature request against Google to implement update on
>>>>>> state variables, as this is honestly a major feature gap. In the meantime
>>>>>> there might be some workarounds you can use until this support is added.
>>>>>> e.g.
>>>>>>    - you could convert the data into a protocol buffer before storing
>>>>>> into state; ProtoCoder should handle new fields, and fields in a protocol
>>>>>> buffer have a deterministic order.
>>>>>>
>>>>> Is there an easy way to go from a java object to a protocol buffer
>>>>> object. The only way I can think of is to first serialize the java model 
>>>>> to
>>>>> json and then deserialize the json into a proto buffer object.  Same for
>>>>> the other way around - protobuf to java object.  while I was writing this 
>>>>> I
>>>>> realized I can maybe store the json dump of my java object in state spec.
>>>>> That should also work.
>>>>>
>>>>>>    - you could explicitly use the Row object, although this will
>>>>>> again make your code more complex.
>>>>>>
>>>>>
>>>>>> Another intermediate option - we could fairly easily add
>>>>>> a @SchemaFieldId annotation to Beam that would allow you to specify the
>>>>>> order of fields. You would have to annotate every getter, something like
>>>>>> this:
>>>>>>
>>>>>> @ScheamFieldId(0)
>>>>>> public T getV1();
>>>>>>
>>>>>> @SchemaFieldId(1)
>>>>>> public T getV2();
>>>>>>
>>>>> Yes please. If this is an easy change and If you can point me in the
>>>>> right direction I will be happy to submit a patch for this.
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> etc.
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 10, 2022 at 12:05 AM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> SerializableCoder can be tricky for updates, and can also be very
>>>>>>> inefficient.
>>>>>>>
>>>>>>> As you noticed, when inferring schemas from a Java class, the order
>>>>>>> of fields is non deterministic (not much that we can do about this, as 
>>>>>>> Java
>>>>>>> doesn't return the fields in a deterministic order). For regular
>>>>>>> PCollections, Dataflow (though currently only Dataflow) will allow 
>>>>>>> update
>>>>>>> even though the field orders have changed. Unfortunately
>>>>>>>
>>>>>>> On Sun, Jan 9, 2022 at 10:14 PM gaurav mishra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> I now have SchemaCoder for all data flowing through the pipeline.
>>>>>>>> I am noticing another issue once a new pipeline is started with
>>>>>>>> --update flag. In my stateful DoFn where I am doing prevState.read(),
>>>>>>>> exceptions are being thrown. errors like
>>>>>>>> Caused by: java.io.EOFException: reached end of stream after
>>>>>>>> reading 68 bytes; 101 bytes expected
>>>>>>>> In this case there was no change in code or data model when
>>>>>>>> relaunching the pipeline.
>>>>>>>>
>>>>>>>> It seems that Schema for my Bean is changing on every run of the
>>>>>>>> pipeline. I manually inspected the Schema returned by the 
>>>>>>>> SchemaRegistry by
>>>>>>>> running the pipeline a few times locally and each time I noticed the 
>>>>>>>> order
>>>>>>>> of fields in the Schema was different. This changing of order of 
>>>>>>>> fields is
>>>>>>>> breaking the new pipeline. Is there a way to enforce some kind of 
>>>>>>>> ordering
>>>>>>>> on the fields?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Jan 9, 2022 at 7:37 PM gaurav mishra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Looking at source code for the JavaBeamSchema it seems I will have
>>>>>>>>> to annotate my getter and setters with any flavour of @Nullable.
>>>>>>>>> Another question -
>>>>>>>>> does the dataflow update pipeline feature work only with
>>>>>>>>> SchemaCoder? I have SerializableCoder and ProtoCoder being used in 
>>>>>>>>> some of
>>>>>>>>> my other pipelines. I wonder if I have to change those too to open 
>>>>>>>>> doors
>>>>>>>>> for updating pipelines there?
>>>>>>>>>
>>>>>>>>> On Sun, Jan 9, 2022 at 6:44 PM gaurav mishra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Some progress made.
>>>>>>>>>> Now I can see that schema has all the needed fields. But all
>>>>>>>>>> fields in the returned schema are marked as "NOT NULL". even though 
>>>>>>>>>> the
>>>>>>>>>> fields are annotated with @javax.annotation.Nullable.
>>>>>>>>>> Is there a different flavor of @Nullable I need to use on my
>>>>>>>>>> fields?
>>>>>>>>>>
>>>>>>>>>> On Sun, Jan 9, 2022 at 5:58 PM Reuven Lax <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ah, this isn't a POJO then. In this case, try using
>>>>>>>>>>> DefaultSchema(JavaBeanSchema.class)
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:55 PM gaurav mishra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> It looks something like this. It currently has around 40
>>>>>>>>>>>> fields.
>>>>>>>>>>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>>>>>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>>> @EqualsAndHashCode
>>>>>>>>>>>> @ToString
>>>>>>>>>>>> public class POJO implements SomeInterface, Serializable {
>>>>>>>>>>>>
>>>>>>>>>>>>     public static final Integer SOME_CONSTANT_FIELD = 2;
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>     private String field1;
>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>     private String field2;
>>>>>>>>>>>>
>>>>>>>>>>>>     @Nullable
>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>     private Integer field3;
>>>>>>>>>>>>
>>>>>>>>>>>>     .....
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>     public String getField1() {
>>>>>>>>>>>>         return field1;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_1")
>>>>>>>>>>>>     public void setField1(String field1) {
>>>>>>>>>>>>         this.field1 = field1;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>     public String getField2() {
>>>>>>>>>>>>         return field2;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_2")
>>>>>>>>>>>>     public void setField2(String field2) {
>>>>>>>>>>>>         this.field2 = field2;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>     public Integer getField3() {
>>>>>>>>>>>>         return field3;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @JsonProperty("field_3")
>>>>>>>>>>>>     public void setField3(Integer field3) {
>>>>>>>>>>>>         this.field3 = field3;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     public POJO() {
>>>>>>>>>>>>
>>>>>>>>>>>>          0 arg empty constructor
>>>>>>>>>>>>     }
>>>>>>>>>>>>     public POJO(POJO other) {
>>>>>>>>>>>>
>>>>>>>>>>>>        ... copy constructor
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 9, 2022 at 5:26 PM Reuven Lax <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Can you paste the code for your Pojo?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 4:09 PM gaurav mishra <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> schemaRegistry.getSchema(dataType) for me is returning a
>>>>>>>>>>>>>> empty schema.
>>>>>>>>>>>>>> my Pojo is annotated with
>>>>>>>>>>>>>> the @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>>>>>> Is there something extra I need to do here to register my
>>>>>>>>>>>>>> class with a schema registry.
>>>>>>>>>>>>>> Note: the code which is building the pipeline is sitting in a
>>>>>>>>>>>>>> library (Different package) which is being imported into my 
>>>>>>>>>>>>>> pipeline code.
>>>>>>>>>>>>>> So perhaps there is some configuration which is missing which 
>>>>>>>>>>>>>> allows the
>>>>>>>>>>>>>> framework to discover my Pojo and the annotations associated 
>>>>>>>>>>>>>> with it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:47 PM gaurav mishra <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:36 PM Reuven Lax <[email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 3:10 PM gaurav mishra <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think I can make it work now. I found a utility method
>>>>>>>>>>>>>>>>> for building my coder from class
>>>>>>>>>>>>>>>>> Something like
>>>>>>>>>>>>>>>>> Class<Data> dataClass = userConfig.getDataClass();
>>>>>>>>>>>>>>>>> Coder<Data> dataCoder =
>>>>>>>>>>>>>>>>> SchemaCoder.of(schemaRegistry.getSchema(dataClass),
>>>>>>>>>>>>>>>>>                 TypeDescriptor.of(dataClass),
>>>>>>>>>>>>>>>>>                 schemaRegistry.getToRowFunction(dataClass),
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> schemaRegistry.getFromRowFunction(dataClass));
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This will work. Though, did annotating the POJO like I said
>>>>>>>>>>>>>>>> not work?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  No, annotation alone does not work since I am not using
>>>>>>>>>>>>>>> concrete classes in the code where the pipeline is being 
>>>>>>>>>>>>>>> constructed.
>>>>>>>>>>>>>>> <Data> above is a template variable in the class which is 
>>>>>>>>>>>>>>> constructing the
>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 2:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> removing setCoder call breaks my pipeline.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> No Coder has been manually specified;  you may do so
>>>>>>>>>>>>>>>>>> using .setCoder().
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>   Inferring a Coder from the CoderRegistry failed:
>>>>>>>>>>>>>>>>>> Unable to provide a Coder for Data.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>   Building a Coder using a registered CoderProvider
>>>>>>>>>>>>>>>>>> failed.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Reason being the code which is building the pipeline is
>>>>>>>>>>>>>>>>>> based on Java Generics. Actual pipeline building code sets a 
>>>>>>>>>>>>>>>>>> bunch of
>>>>>>>>>>>>>>>>>> parameters which are used to construct the pipeline.
>>>>>>>>>>>>>>>>>> PCollection<Data> stream =
>>>>>>>>>>>>>>>>>> pipeline.apply(userProvidedTransform).get(outputTag).setCoder(userProvidedCoder)
>>>>>>>>>>>>>>>>>> So I guess I will need to provide some more information
>>>>>>>>>>>>>>>>>> to the framework to make the annotation work.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:39 PM Reuven Lax <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If you annotate your POJO
>>>>>>>>>>>>>>>>>>> with @DefaultSchema(JavaFieldSchema.class), that will 
>>>>>>>>>>>>>>>>>>> usually automatically
>>>>>>>>>>>>>>>>>>> set up schema inference (you'll have to remove the setCoder 
>>>>>>>>>>>>>>>>>>> call).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:32 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> How to set up my pipeline to use Beam's schema encoding.
>>>>>>>>>>>>>>>>>>>> In my current code I am doing something like this
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> PCollection<Data> =
>>>>>>>>>>>>>>>>>>>> pipeline.apply(someTransform).get(outputTag).setCoder(AvroCoder.of(Data.class))
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:16 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I don't think we make any guarantees about Avro coder.
>>>>>>>>>>>>>>>>>>>>> Can you use Beam's schema encoding instead?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Sun, Jan 9, 2022 at 1:14 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Is there a way to programmatically check for
>>>>>>>>>>>>>>>>>>>>>> compatibility? I would like to fail my unit tests if 
>>>>>>>>>>>>>>>>>>>>>> incompatible changes
>>>>>>>>>>>>>>>>>>>>>> are made to Pojo.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:49 PM Luke Cwik <
>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Check the schema of the avro encoding for the POJO
>>>>>>>>>>>>>>>>>>>>>>> before and after the change to ensure that they are 
>>>>>>>>>>>>>>>>>>>>>>> compatible as you
>>>>>>>>>>>>>>>>>>>>>>> expect.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 7, 2022 at 4:12 PM gaurav mishra <
>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> This is more of a Dataflow question I guess but
>>>>>>>>>>>>>>>>>>>>>>>> asking here in hopes someone has faced a similar 
>>>>>>>>>>>>>>>>>>>>>>>> problem and can help.
>>>>>>>>>>>>>>>>>>>>>>>> I am trying to use "--update" option to update a
>>>>>>>>>>>>>>>>>>>>>>>> running Dataflow job. I am noticing that compatibility 
>>>>>>>>>>>>>>>>>>>>>>>> checks fail any time
>>>>>>>>>>>>>>>>>>>>>>>> I add a new field to my data model. Error says
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> The Coder or type for step XYZ  has changed
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I am using a Java Pojo for data.  Avro coder to 
>>>>>>>>>>>>>>>>>>>>>>>> serialize the model. I read somewhere that adding new 
>>>>>>>>>>>>>>>>>>>>>>>> optional fields to the data should work when updating 
>>>>>>>>>>>>>>>>>>>>>>>> the pipeline.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I am fine with updating the coder or implementation of 
>>>>>>>>>>>>>>>>>>>>>>>> the model to something which allows me to update the 
>>>>>>>>>>>>>>>>>>>>>>>> pipeline in cases when I add new optional fields to 
>>>>>>>>>>>>>>>>>>>>>>>> existing model. Any suggestions?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>

Reply via email to