Final, working version is in the original gist:
https://gist.github.com/jacobmarble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
The result is heavily inspired by GroupIntoBatches, and doesn't use
windowing.
Jacob
On Wed, Oct 18, 2017 at 2:49 PM, Jacob Marble wrote:
> Thomas, I
Thomas, I reworked using the GroupIntoBatches PTransform, and things
working great (with fewer lines of code).
Thanks
Jacob
On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble wrote:
> That gist isn't working right now, but I'll update it when I find the bug.
>
> The direct
That gist isn't working right now, but I'll update it when I find the bug.
The direct runner grows memory, but never writes files.
The dataflow runner writes temp files, but FinalizeGroupByKey never moves
them to the final destination.
Jacob
On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble
The calls are essentially atomic per-key.
More specifically, the two calls can occur in one of two ways:
1) They are for elements which share a key. If so, the calls _must_ be made
serially, so the second read() will see the result of the first write()
2) They are for elements which do not share
Consider multiple instances of a DoFn:
@ProcessElement
public void window(ProcessContext context,
@StateId("count") ValueState countState) {
int count = MoreObjects.firstNonNull(countState.read(), 0);
count += 1;
countState.write(count);
If two instances read
What do you mean by non-atomic?
All output/state changes/timers from a process bundle are an all or nothing
change. So if processing a bundle fails, any state changes are discarded
and the state is reset to what it was before the bundle was processed.
On Wed, Oct 18, 2017 at 12:15 PM, Jacob
Feel free to share it with an online paste or a link to a github repo
containing the code.
Other users may be interested in your solution.
On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble wrote:
> Lukasz-
>
> That worked. I created a stateful DoFn with a stale timer, an
Lukasz-
That worked. I created a stateful DoFn with a stale timer, an initial
timestamp state, and a counter state, along with a buffer of elements to
bundle. When the counter or timer exceeds max values, outputWithTimestamp().
I'm happy to post the entire implementation somewhere, not sure
My first streaming pipeline is pretty simple, it just pipes a queue into
files:
- read JSON objects from PubsubIO
- event time = processing time
- 5 minute windows (
- write n files to GCS, (TextIO.withNumShards() not dynamic)
When the pipeline gets behind (for example, when the pipeline is