The calls are essentially atomic per-key. More specifically, the two calls can occur in one of two ways:
1) They are for elements which share a key. If so, the calls _must_ be made serially, so the second read() will see the result of the first write() 2) They are for elements which do not share a key. If so, the state instances are unique and independent, so both will contain 1. As an aside, I unfortunately missed some of this - part of the change may be duplication of the 'GroupIntoBatches' PTransform within the Core SDK. On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble <jmar...@kochava.com> wrote: > Consider multiple instances of a DoFn: > > @ProcessElement > public void window(ProcessContext context, > @StateId("count") ValueState<Integer> countState) { > > int count = MoreObjects.firstNonNull(countState.read(), 0); > count += 1; > countState.write(count); > > If two instances read countState, then write countState, will countState > not be incremented by 1, but not by 2? > > Jacob > > On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <lc...@google.com> wrote: > >> What do you mean by non-atomic? >> >> All output/state changes/timers from a process bundle are an all or >> nothing change. So if processing a bundle fails, any state changes are >> discarded and the state is reset to what it was before the bundle was >> processed. >> >> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <jmar...@kochava.com> >> wrote: >> >>> Here's a gist: https://gist.github.com/jacobm >>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c >>> >>> Should I consider StateId value mutations to be non-atomic? >>> >>> Jacob >>> >>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <lc...@google.com> wrote: >>> >>>> Feel free to share it with an online paste or a link to a github repo >>>> containing the code. >>>> >>>> Other users may be interested in your solution. >>>> >>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <jmar...@kochava.com> >>>> wrote: >>>> >>>>> Lukasz- >>>>> >>>>> That worked. I created a stateful DoFn with a stale timer, an initial >>>>> timestamp state, and a counter state, along with a buffer of elements to >>>>> bundle. When the counter or timer exceeds max values, >>>>> outputWithTimestamp(). >>>>> >>>>> I'm happy to post the entire implementation somewhere, not sure about >>>>> etiquette and how this mailing list handles attachments. >>>>> >>>>> Jacob >>>>> >>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> Have you considered using a stateful DoFn, buffering/batching based >>>>>> upon a certain number of elements is shown in this blog[1] and could be >>>>>> extended for your usecase. >>>>>> >>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>>>> >>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com> >>>>>> wrote: >>>>>> >>>>>>> My first streaming pipeline is pretty simple, it just pipes a queue >>>>>>> into files: >>>>>>> >>>>>>> - read JSON objects from PubsubIO >>>>>>> - event time = processing time >>>>>>> - 5 minute windows ( >>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic) >>>>>>> >>>>>>> When the pipeline gets behind (for example, when the pipeline is >>>>>>> stopped for an hour and restarted) this creates problems because the >>>>>>> amount >>>>>>> of data per file becomes too much, and the pipeline stays behind. >>>>>>> >>>>>>> I believe that what is needed is a new step, just before "write to >>>>>>> GCS": >>>>>>> >>>>>>> - split/partition/window into ceil(totalElements / maxElements) >>>>>>> groups >>>>>>> >>>>>>> My next idea is to implement my own Partition and PartitionDoFn that >>>>>>> accept a PCollectionView<Long> from Count.perElemen(). >>>>>>> >>>>>>> Is there a more built-in way to accomplish dynamic partitions by >>>>>>> element quantity? >>>>>>> >>>>>>> Jacob >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >