Consider multiple instances of a DoFn: @ProcessElement public void window(ProcessContext context, @StateId("count") ValueState<Integer> countState) {
int count = MoreObjects.firstNonNull(countState.read(), 0); count += 1; countState.write(count); If two instances read countState, then write countState, will countState not be incremented by 1, but not by 2? Jacob On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <lc...@google.com> wrote: > What do you mean by non-atomic? > > All output/state changes/timers from a process bundle are an all or > nothing change. So if processing a bundle fails, any state changes are > discarded and the state is reset to what it was before the bundle was > processed. > > On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <jmar...@kochava.com> > wrote: > >> Here's a gist: https://gist.github.com/jacobm >> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c >> >> Should I consider StateId value mutations to be non-atomic? >> >> Jacob >> >> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <lc...@google.com> wrote: >> >>> Feel free to share it with an online paste or a link to a github repo >>> containing the code. >>> >>> Other users may be interested in your solution. >>> >>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <jmar...@kochava.com> >>> wrote: >>> >>>> Lukasz- >>>> >>>> That worked. I created a stateful DoFn with a stale timer, an initial >>>> timestamp state, and a counter state, along with a buffer of elements to >>>> bundle. When the counter or timer exceeds max values, >>>> outputWithTimestamp(). >>>> >>>> I'm happy to post the entire implementation somewhere, not sure about >>>> etiquette and how this mailing list handles attachments. >>>> >>>> Jacob >>>> >>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> Have you considered using a stateful DoFn, buffering/batching based >>>>> upon a certain number of elements is shown in this blog[1] and could be >>>>> extended for your usecase. >>>>> >>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>>> >>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com> >>>>> wrote: >>>>> >>>>>> My first streaming pipeline is pretty simple, it just pipes a queue >>>>>> into files: >>>>>> >>>>>> - read JSON objects from PubsubIO >>>>>> - event time = processing time >>>>>> - 5 minute windows ( >>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic) >>>>>> >>>>>> When the pipeline gets behind (for example, when the pipeline is >>>>>> stopped for an hour and restarted) this creates problems because the >>>>>> amount >>>>>> of data per file becomes too much, and the pipeline stays behind. >>>>>> >>>>>> I believe that what is needed is a new step, just before "write to >>>>>> GCS": >>>>>> >>>>>> - split/partition/window into ceil(totalElements / maxElements) groups >>>>>> >>>>>> My next idea is to implement my own Partition and PartitionDoFn that >>>>>> accept a PCollectionView<Long> from Count.perElemen(). >>>>>> >>>>>> Is there a more built-in way to accomplish dynamic partitions by >>>>>> element quantity? >>>>>> >>>>>> Jacob >>>>>> >>>>> >>>>> >>>> >>> >> >