Hi Luke, Thank you for your explanation. It was a greats help for me.
Regards, Hamed On Thu, 19 Aug 2021 at 19:35, Luke Cwik <[email protected]> wrote: > This is an example[1] of how to perform a read+write transaction which you > could place in a DoFn. The issue with such an approach is that if something > fails after the transaction is committed in Spanner but before the bundle > is successfully completed then the bundle will be retried and you will > increment the value multiple times. For some pipelines this is ok but if > you need to guard against this then you will want to use a stateful DoFn[2] > that keeps track of what the COLUMN1 value is and use it as the source of > truth when running the pipeline. Your logic would look something like: > > processElement(...) { > column1 = readFromStateValue(); > if (column1 is null) { > firstRead = true; > column1 = readValueFromSpanner(); > } > column1 = column1 + newValue; > writeValueToState(column1); > if (firstRead) { > // Since this is the first time we read, we don't want to write to > spanner till the state is durably committed otherwise if the bundle fails > the next first read will have the wrong value > scheduleProcessingTimerToWriteValueToSpanner(); > } else { > writeValueToSpanner(column1); > } > } > > Note that the stateful DoFn will run serially for each unique key and > window which typically means that you want to be using the global window > and one key per table and or (table, column) your updating. > > 1: > https://cloud.google.com/spanner/docs/modify-mutation-api#updating_rows_in_a_table > 2: https://beam.apache.org/blog/stateful-processing/ > > On Thu, Aug 19, 2021 at 4:04 AM Hamed Amini <[email protected]> > wrote: > >> Hi guys, >> >> I am working on an accumulation task with dataflow; I want to consume >> events from Pubsub and accumulate and insert them into Spanner. For >> inserting part I want to increment the value of the count column in my >> table; with DML I can do it as below: >> >> "UPDATE MY_TABLE set COLUMN1 = COLUMN1 + newValue" >> >> but I don't know how can I do that with mutation (or with Beam Spanner >> API)! Could you please help me? >> >> >> Regards >> Hamed >> >
