Re: How to window by quantity of data?

2017-10-20 Thread Jacob Marble
Final, working version is in the original gist: https://gist.github.com/jacobmarble/6ca40e0a14828e6a0dfe89b9cb2e4b4c The result is heavily inspired by GroupIntoBatches, and doesn't use windowing. Jacob On Wed, Oct 18, 2017 at 2:49 PM, Jacob Marble wrote: > Thomas, I

Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
Thomas, I reworked using the GroupIntoBatches PTransform, and things working great (with fewer lines of code). Thanks Jacob On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble wrote: > That gist isn't working right now, but I'll update it when I find the bug. > > The direct

Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
That gist isn't working right now, but I'll update it when I find the bug. The direct runner grows memory, but never writes files. The dataflow runner writes temp files, but FinalizeGroupByKey never moves them to the final destination. Jacob On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble

Re: How to window by quantity of data?

2017-10-18 Thread Thomas Groh
The calls are essentially atomic per-key. More specifically, the two calls can occur in one of two ways: 1) They are for elements which share a key. If so, the calls _must_ be made serially, so the second read() will see the result of the first write() 2) They are for elements which do not share

Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
Consider multiple instances of a DoFn: @ProcessElement public void window(ProcessContext context, @StateId("count") ValueState countState) { int count = MoreObjects.firstNonNull(countState.read(), 0); count += 1; countState.write(count); If two instances read

Re: How to window by quantity of data?

2017-10-18 Thread Lukasz Cwik
What do you mean by non-atomic? All output/state changes/timers from a process bundle are an all or nothing change. So if processing a bundle fails, any state changes are discarded and the state is reset to what it was before the bundle was processed. On Wed, Oct 18, 2017 at 12:15 PM, Jacob

Re: How to window by quantity of data?

2017-10-18 Thread Lukasz Cwik
Feel free to share it with an online paste or a link to a github repo containing the code. Other users may be interested in your solution. On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble wrote: > Lukasz- > > That worked. I created a stateful DoFn with a stale timer, an

Re: How to window by quantity of data?

2017-10-17 Thread Jacob Marble
Lukasz- That worked. I created a stateful DoFn with a stale timer, an initial timestamp state, and a counter state, along with a buffer of elements to bundle. When the counter or timer exceeds max values, outputWithTimestamp(). I'm happy to post the entire implementation somewhere, not sure

How to window by quantity of data?

2017-10-17 Thread Jacob Marble
My first streaming pipeline is pretty simple, it just pipes a queue into files: - read JSON objects from PubsubIO - event time = processing time - 5 minute windows ( - write n files to GCS, (TextIO.withNumShards() not dynamic) When the pipeline gets behind (for example, when the pipeline is