Final, working version is in the original gist: https://gist.github.com/jacobmarble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
The result is heavily inspired by GroupIntoBatches, and doesn't use windowing. Jacob On Wed, Oct 18, 2017 at 2:49 PM, Jacob Marble <[email protected]> wrote: > Thomas, I reworked using the GroupIntoBatches PTransform, and things > working great (with fewer lines of code). > > Thanks > > Jacob > > On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble <[email protected]> wrote: > >> That gist isn't working right now, but I'll update it when I find the bug. >> >> The direct runner grows memory, but never writes files. >> The dataflow runner writes temp files, but FinalizeGroupByKey never moves >> them to the final destination. >> >> Jacob >> >> On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble <[email protected]> >> wrote: >> >>> Consider multiple instances of a DoFn: >>> >>> @ProcessElement >>> public void window(ProcessContext context, >>> @StateId("count") ValueState<Integer> countState) { >>> >>> int count = MoreObjects.firstNonNull(countState.read(), 0); >>> count += 1; >>> countState.write(count); >>> >>> If two instances read countState, then write countState, will countState >>> not be incremented by 1, but not by 2? >>> >>> Jacob >>> >>> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <[email protected]> wrote: >>> >>>> What do you mean by non-atomic? >>>> >>>> All output/state changes/timers from a process bundle are an all or >>>> nothing change. So if processing a bundle fails, any state changes are >>>> discarded and the state is reset to what it was before the bundle was >>>> processed. >>>> >>>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <[email protected]> >>>> wrote: >>>> >>>>> Here's a gist: https://gist.github.com/jacobm >>>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c >>>>> >>>>> Should I consider StateId value mutations to be non-atomic? >>>>> >>>>> Jacob >>>>> >>>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> Feel free to share it with an online paste or a link to a github repo >>>>>> containing the code. >>>>>> >>>>>> Other users may be interested in your solution. >>>>>> >>>>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Lukasz- >>>>>>> >>>>>>> That worked. I created a stateful DoFn with a stale timer, an >>>>>>> initial timestamp state, and a counter state, along with a buffer of >>>>>>> elements to bundle. When the counter or timer exceeds max values, >>>>>>> outputWithTimestamp(). >>>>>>> >>>>>>> I'm happy to post the entire implementation somewhere, not sure >>>>>>> about etiquette and how this mailing list handles attachments. >>>>>>> >>>>>>> Jacob >>>>>>> >>>>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Have you considered using a stateful DoFn, buffering/batching based >>>>>>>> upon a certain number of elements is shown in this blog[1] and could be >>>>>>>> extended for your usecase. >>>>>>>> >>>>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>>>>>> >>>>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> My first streaming pipeline is pretty simple, it just pipes a >>>>>>>>> queue into files: >>>>>>>>> >>>>>>>>> - read JSON objects from PubsubIO >>>>>>>>> - event time = processing time >>>>>>>>> - 5 minute windows ( >>>>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic) >>>>>>>>> >>>>>>>>> When the pipeline gets behind (for example, when the pipeline is >>>>>>>>> stopped for an hour and restarted) this creates problems because the >>>>>>>>> amount >>>>>>>>> of data per file becomes too much, and the pipeline stays behind. >>>>>>>>> >>>>>>>>> I believe that what is needed is a new step, just before "write to >>>>>>>>> GCS": >>>>>>>>> >>>>>>>>> - split/partition/window into ceil(totalElements / maxElements) >>>>>>>>> groups >>>>>>>>> >>>>>>>>> My next idea is to implement my own Partition and PartitionDoFn >>>>>>>>> that accept a PCollectionView<Long> from Count.perElemen(). >>>>>>>>> >>>>>>>>> Is there a more built-in way to accomplish dynamic partitions by >>>>>>>>> element quantity? >>>>>>>>> >>>>>>>>> Jacob >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
