Thomas, I reworked using the GroupIntoBatches PTransform, and things working great (with fewer lines of code).
Thanks Jacob On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble <jmar...@kochava.com> wrote: > That gist isn't working right now, but I'll update it when I find the bug. > > The direct runner grows memory, but never writes files. > The dataflow runner writes temp files, but FinalizeGroupByKey never moves > them to the final destination. > > Jacob > > On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble <jmar...@kochava.com> > wrote: > >> Consider multiple instances of a DoFn: >> >> @ProcessElement >> public void window(ProcessContext context, >> @StateId("count") ValueState<Integer> countState) { >> >> int count = MoreObjects.firstNonNull(countState.read(), 0); >> count += 1; >> countState.write(count); >> >> If two instances read countState, then write countState, will countState >> not be incremented by 1, but not by 2? >> >> Jacob >> >> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <lc...@google.com> wrote: >> >>> What do you mean by non-atomic? >>> >>> All output/state changes/timers from a process bundle are an all or >>> nothing change. So if processing a bundle fails, any state changes are >>> discarded and the state is reset to what it was before the bundle was >>> processed. >>> >>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <jmar...@kochava.com> >>> wrote: >>> >>>> Here's a gist: https://gist.github.com/jacobm >>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c >>>> >>>> Should I consider StateId value mutations to be non-atomic? >>>> >>>> Jacob >>>> >>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> Feel free to share it with an online paste or a link to a github repo >>>>> containing the code. >>>>> >>>>> Other users may be interested in your solution. >>>>> >>>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <jmar...@kochava.com> >>>>> wrote: >>>>> >>>>>> Lukasz- >>>>>> >>>>>> That worked. I created a stateful DoFn with a stale timer, an initial >>>>>> timestamp state, and a counter state, along with a buffer of elements to >>>>>> bundle. When the counter or timer exceeds max values, >>>>>> outputWithTimestamp(). >>>>>> >>>>>> I'm happy to post the entire implementation somewhere, not sure about >>>>>> etiquette and how this mailing list handles attachments. >>>>>> >>>>>> Jacob >>>>>> >>>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Have you considered using a stateful DoFn, buffering/batching based >>>>>>> upon a certain number of elements is shown in this blog[1] and could be >>>>>>> extended for your usecase. >>>>>>> >>>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>>>>> >>>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com> >>>>>>> wrote: >>>>>>> >>>>>>>> My first streaming pipeline is pretty simple, it just pipes a queue >>>>>>>> into files: >>>>>>>> >>>>>>>> - read JSON objects from PubsubIO >>>>>>>> - event time = processing time >>>>>>>> - 5 minute windows ( >>>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic) >>>>>>>> >>>>>>>> When the pipeline gets behind (for example, when the pipeline is >>>>>>>> stopped for an hour and restarted) this creates problems because the >>>>>>>> amount >>>>>>>> of data per file becomes too much, and the pipeline stays behind. >>>>>>>> >>>>>>>> I believe that what is needed is a new step, just before "write to >>>>>>>> GCS": >>>>>>>> >>>>>>>> - split/partition/window into ceil(totalElements / maxElements) >>>>>>>> groups >>>>>>>> >>>>>>>> My next idea is to implement my own Partition and PartitionDoFn >>>>>>>> that accept a PCollectionView<Long> from Count.perElemen(). >>>>>>>> >>>>>>>> Is there a more built-in way to accomplish dynamic partitions by >>>>>>>> element quantity? >>>>>>>> >>>>>>>> Jacob >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >