What do you mean by non-atomic? All output/state changes/timers from a process bundle are an all or nothing change. So if processing a bundle fails, any state changes are discarded and the state is reset to what it was before the bundle was processed.
On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <[email protected]> wrote: > Here's a gist: https://gist.github.com/jacobmarble/ > 6ca40e0a14828e6a0dfe89b9cb2e4b4c > > Should I consider StateId value mutations to be non-atomic? > > Jacob > > On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <[email protected]> wrote: > >> Feel free to share it with an online paste or a link to a github repo >> containing the code. >> >> Other users may be interested in your solution. >> >> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <[email protected]> >> wrote: >> >>> Lukasz- >>> >>> That worked. I created a stateful DoFn with a stale timer, an initial >>> timestamp state, and a counter state, along with a buffer of elements to >>> bundle. When the counter or timer exceeds max values, outputWithTimestamp(). >>> >>> I'm happy to post the entire implementation somewhere, not sure about >>> etiquette and how this mailing list handles attachments. >>> >>> Jacob >>> >>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <[email protected]> wrote: >>> >>>> Have you considered using a stateful DoFn, buffering/batching based >>>> upon a certain number of elements is shown in this blog[1] and could be >>>> extended for your usecase. >>>> >>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>> >>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <[email protected]> >>>> wrote: >>>> >>>>> My first streaming pipeline is pretty simple, it just pipes a queue >>>>> into files: >>>>> >>>>> - read JSON objects from PubsubIO >>>>> - event time = processing time >>>>> - 5 minute windows ( >>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic) >>>>> >>>>> When the pipeline gets behind (for example, when the pipeline is >>>>> stopped for an hour and restarted) this creates problems because the >>>>> amount >>>>> of data per file becomes too much, and the pipeline stays behind. >>>>> >>>>> I believe that what is needed is a new step, just before "write to >>>>> GCS": >>>>> >>>>> - split/partition/window into ceil(totalElements / maxElements) groups >>>>> >>>>> My next idea is to implement my own Partition and PartitionDoFn that >>>>> accept a PCollectionView<Long> from Count.perElemen(). >>>>> >>>>> Is there a more built-in way to accomplish dynamic partitions by >>>>> element quantity? >>>>> >>>>> Jacob >>>>> >>>> >>>> >>> >> >
