Hi Luke,

Thank you for your explanation. It was a greats help for me.

Regards,
Hamed

On Thu, 19 Aug 2021 at 19:35, Luke Cwik <[email protected]> wrote:

> This is an example[1] of how to perform a read+write transaction which you
> could place in a DoFn. The issue with such an approach is that if something
> fails after the transaction is committed in Spanner but before the bundle
> is successfully completed then the bundle will be retried and you will
> increment the value multiple times. For some pipelines this is ok but if
> you need to guard against this then you will want to use a stateful DoFn[2]
> that keeps track of what the COLUMN1 value is and use it as the source of
> truth when running the pipeline. Your logic would look something like:
>
> processElement(...) {
>   column1 = readFromStateValue();
>   if (column1 is null) {
>     firstRead = true;
>     column1 = readValueFromSpanner();
>   }
>   column1 = column1 + newValue;
>   writeValueToState(column1);
>   if (firstRead) {
>     // Since this is the first time we read, we don't want to write to
> spanner till the state is durably committed otherwise if the bundle fails
> the next first read will have the wrong value
>     scheduleProcessingTimerToWriteValueToSpanner();
>   } else {
>     writeValueToSpanner(column1);
>   }
> }
>
> Note that the stateful DoFn will run serially for each unique key and
> window which typically means that you want to be using the global window
> and one key per table and or (table, column) your updating.
>
> 1:
> https://cloud.google.com/spanner/docs/modify-mutation-api#updating_rows_in_a_table
> 2: https://beam.apache.org/blog/stateful-processing/
>
> On Thu, Aug 19, 2021 at 4:04 AM Hamed Amini <[email protected]>
> wrote:
>
>> Hi guys,
>>
>> I am working on an accumulation task with dataflow; I want to consume
>> events from Pubsub and accumulate and insert them into Spanner. For
>> inserting part I want to increment the value of the count column in my
>> table; with DML I can do it as below:
>>
>> "UPDATE MY_TABLE set COLUMN1 = COLUMN1 + newValue"
>>
>> but I don't know how can I do that with mutation (or with Beam Spanner
>> API)! Could you please help me?
>>
>>
>> Regards
>> Hamed
>>
>

Reply via email to