The calls are essentially atomic per-key.

More specifically, the two calls can occur in one of two ways:

1) They are for elements which share a key. If so, the calls _must_ be made
serially, so the second read() will see the result of the first write()
2) They are for elements which do not share a key. If so, the state
instances are unique and independent, so both will contain 1.

As an aside, I unfortunately missed some of this - part of the change may
be duplication of the 'GroupIntoBatches' PTransform within the Core SDK.

On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble <jmar...@kochava.com> wrote:

> Consider multiple instances of a DoFn:
>
>     @ProcessElement
>     public void window(ProcessContext context,
>         @StateId("count") ValueState<Integer> countState) {
>
>     int count = MoreObjects.firstNonNull(countState.read(), 0);
>     count += 1;
>     countState.write(count);
>
> If two instances read countState, then write countState, will countState
> not be incremented by 1, but not by 2?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> What do you mean by non-atomic?
>>
>> All output/state changes/timers from a process bundle are an all or
>> nothing change. So if processing a bundle fails, any state changes are
>> discarded and the state is reset to what it was before the bundle was
>> processed.
>>
>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <jmar...@kochava.com>
>> wrote:
>>
>>> Here's a gist: https://gist.github.com/jacobm
>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>
>>> Should I consider StateId value mutations to be non-atomic?
>>>
>>> Jacob
>>>
>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Feel free to share it with an online paste or a link to a github repo
>>>> containing the code.
>>>>
>>>> Other users may be interested in your solution.
>>>>
>>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <jmar...@kochava.com>
>>>> wrote:
>>>>
>>>>> Lukasz-
>>>>>
>>>>> That worked. I created a stateful DoFn with a stale timer, an initial
>>>>> timestamp state, and a counter state, along with a buffer of elements to
>>>>> bundle. When the counter or timer exceeds max values, 
>>>>> outputWithTimestamp().
>>>>>
>>>>> I'm happy to post the entire implementation somewhere, not sure about
>>>>> etiquette and how this mailing list handles attachments.
>>>>>
>>>>> Jacob
>>>>>
>>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Have you considered using a stateful DoFn, buffering/batching based
>>>>>> upon a certain number of elements is shown in this blog[1] and could be
>>>>>> extended for your usecase.
>>>>>>
>>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>>>
>>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com>
>>>>>> wrote:
>>>>>>
>>>>>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>>>>>> into files:
>>>>>>>
>>>>>>> - read JSON objects from PubsubIO
>>>>>>> - event time = processing time
>>>>>>> - 5 minute windows (
>>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>>>>
>>>>>>> When the pipeline gets behind (for example, when the pipeline is
>>>>>>> stopped for an hour and restarted) this creates problems because the 
>>>>>>> amount
>>>>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>>>>
>>>>>>> I believe that what is needed is a new step, just before "write to
>>>>>>> GCS":
>>>>>>>
>>>>>>> - split/partition/window into ceil(totalElements / maxElements)
>>>>>>> groups
>>>>>>>
>>>>>>> My next idea is to implement my own Partition and PartitionDoFn that
>>>>>>> accept a PCollectionView<Long> from Count.perElemen().
>>>>>>>
>>>>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>>>>> element quantity?
>>>>>>>
>>>>>>> Jacob
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to