Feel free to share it with an online paste or a link to a github repo containing the code.
Other users may be interested in your solution. On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <[email protected]> wrote: > Lukasz- > > That worked. I created a stateful DoFn with a stale timer, an initial > timestamp state, and a counter state, along with a buffer of elements to > bundle. When the counter or timer exceeds max values, outputWithTimestamp(). > > I'm happy to post the entire implementation somewhere, not sure about > etiquette and how this mailing list handles attachments. > > Jacob > > On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <[email protected]> wrote: > >> Have you considered using a stateful DoFn, buffering/batching based upon >> a certain number of elements is shown in this blog[1] and could be extended >> for your usecase. >> >> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >> >> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <[email protected]> >> wrote: >> >>> My first streaming pipeline is pretty simple, it just pipes a queue into >>> files: >>> >>> - read JSON objects from PubsubIO >>> - event time = processing time >>> - 5 minute windows ( >>> - write n files to GCS, (TextIO.withNumShards() not dynamic) >>> >>> When the pipeline gets behind (for example, when the pipeline is stopped >>> for an hour and restarted) this creates problems because the amount of data >>> per file becomes too much, and the pipeline stays behind. >>> >>> I believe that what is needed is a new step, just before "write to GCS": >>> >>> - split/partition/window into ceil(totalElements / maxElements) groups >>> >>> My next idea is to implement my own Partition and PartitionDoFn that >>> accept a PCollectionView<Long> from Count.perElemen(). >>> >>> Is there a more built-in way to accomplish dynamic partitions by element >>> quantity? >>> >>> Jacob >>> >> >> >
