Here's a gist: https://gist.github.com/jacobmarble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
Should I consider StateId value mutations to be non-atomic? Jacob On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <[email protected]> wrote: > Feel free to share it with an online paste or a link to a github repo > containing the code. > > Other users may be interested in your solution. > > On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <[email protected]> wrote: > >> Lukasz- >> >> That worked. I created a stateful DoFn with a stale timer, an initial >> timestamp state, and a counter state, along with a buffer of elements to >> bundle. When the counter or timer exceeds max values, outputWithTimestamp(). >> >> I'm happy to post the entire implementation somewhere, not sure about >> etiquette and how this mailing list handles attachments. >> >> Jacob >> >> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <[email protected]> wrote: >> >>> Have you considered using a stateful DoFn, buffering/batching based upon >>> a certain number of elements is shown in this blog[1] and could be extended >>> for your usecase. >>> >>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >>> >>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <[email protected]> >>> wrote: >>> >>>> My first streaming pipeline is pretty simple, it just pipes a queue >>>> into files: >>>> >>>> - read JSON objects from PubsubIO >>>> - event time = processing time >>>> - 5 minute windows ( >>>> - write n files to GCS, (TextIO.withNumShards() not dynamic) >>>> >>>> When the pipeline gets behind (for example, when the pipeline is >>>> stopped for an hour and restarted) this creates problems because the amount >>>> of data per file becomes too much, and the pipeline stays behind. >>>> >>>> I believe that what is needed is a new step, just before "write to GCS": >>>> >>>> - split/partition/window into ceil(totalElements / maxElements) groups >>>> >>>> My next idea is to implement my own Partition and PartitionDoFn that >>>> accept a PCollectionView<Long> from Count.perElemen(). >>>> >>>> Is there a more built-in way to accomplish dynamic partitions by >>>> element quantity? >>>> >>>> Jacob >>>> >>> >>> >> >
