Consider multiple instances of a DoFn:

    @ProcessElement
    public void window(ProcessContext context,
        @StateId("count") ValueState<Integer> countState) {

    int count = MoreObjects.firstNonNull(countState.read(), 0);
    count += 1;
    countState.write(count);

If two instances read countState, then write countState, will countState
not be incremented by 1, but not by 2?

Jacob

On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <lc...@google.com> wrote:

> What do you mean by non-atomic?
>
> All output/state changes/timers from a process bundle are an all or
> nothing change. So if processing a bundle fails, any state changes are
> discarded and the state is reset to what it was before the bundle was
> processed.
>
> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <jmar...@kochava.com>
> wrote:
>
>> Here's a gist: https://gist.github.com/jacobm
>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>
>> Should I consider StateId value mutations to be non-atomic?
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Feel free to share it with an online paste or a link to a github repo
>>> containing the code.
>>>
>>> Other users may be interested in your solution.
>>>
>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <jmar...@kochava.com>
>>> wrote:
>>>
>>>> Lukasz-
>>>>
>>>> That worked. I created a stateful DoFn with a stale timer, an initial
>>>> timestamp state, and a counter state, along with a buffer of elements to
>>>> bundle. When the counter or timer exceeds max values, 
>>>> outputWithTimestamp().
>>>>
>>>> I'm happy to post the entire implementation somewhere, not sure about
>>>> etiquette and how this mailing list handles attachments.
>>>>
>>>> Jacob
>>>>
>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Have you considered using a stateful DoFn, buffering/batching based
>>>>> upon a certain number of elements is shown in this blog[1] and could be
>>>>> extended for your usecase.
>>>>>
>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>>
>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com>
>>>>> wrote:
>>>>>
>>>>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>>>>> into files:
>>>>>>
>>>>>> - read JSON objects from PubsubIO
>>>>>> - event time = processing time
>>>>>> - 5 minute windows (
>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>>>
>>>>>> When the pipeline gets behind (for example, when the pipeline is
>>>>>> stopped for an hour and restarted) this creates problems because the 
>>>>>> amount
>>>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>>>
>>>>>> I believe that what is needed is a new step, just before "write to
>>>>>> GCS":
>>>>>>
>>>>>> - split/partition/window into ceil(totalElements / maxElements) groups
>>>>>>
>>>>>> My next idea is to implement my own Partition and PartitionDoFn that
>>>>>> accept a PCollectionView<Long> from Count.perElemen().
>>>>>>
>>>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>>>> element quantity?
>>>>>>
>>>>>> Jacob
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to