Final, working version is in the original gist:
https://gist.github.com/jacobmarble/6ca40e0a14828e6a0dfe89b9cb2e4b4c

The result is heavily inspired by GroupIntoBatches, and doesn't use
windowing.

Jacob

On Wed, Oct 18, 2017 at 2:49 PM, Jacob Marble <[email protected]> wrote:

> Thomas, I reworked using the GroupIntoBatches PTransform, and things
> working great (with fewer lines of code).
>
> Thanks
>
> Jacob
>
> On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble <[email protected]> wrote:
>
>> That gist isn't working right now, but I'll update it when I find the bug.
>>
>> The direct runner grows memory, but never writes files.
>> The dataflow runner writes temp files, but FinalizeGroupByKey never moves
>> them to the final destination.
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble <[email protected]>
>> wrote:
>>
>>> Consider multiple instances of a DoFn:
>>>
>>>     @ProcessElement
>>>     public void window(ProcessContext context,
>>>         @StateId("count") ValueState<Integer> countState) {
>>>
>>>     int count = MoreObjects.firstNonNull(countState.read(), 0);
>>>     count += 1;
>>>     countState.write(count);
>>>
>>> If two instances read countState, then write countState, will countState
>>> not be incremented by 1, but not by 2?
>>>
>>> Jacob
>>>
>>> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <[email protected]> wrote:
>>>
>>>> What do you mean by non-atomic?
>>>>
>>>> All output/state changes/timers from a process bundle are an all or
>>>> nothing change. So if processing a bundle fails, any state changes are
>>>> discarded and the state is reset to what it was before the bundle was
>>>> processed.
>>>>
>>>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <[email protected]>
>>>> wrote:
>>>>
>>>>> Here's a gist: https://gist.github.com/jacobm
>>>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>>>
>>>>> Should I consider StateId value mutations to be non-atomic?
>>>>>
>>>>> Jacob
>>>>>
>>>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> Feel free to share it with an online paste or a link to a github repo
>>>>>> containing the code.
>>>>>>
>>>>>> Other users may be interested in your solution.
>>>>>>
>>>>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Lukasz-
>>>>>>>
>>>>>>> That worked. I created a stateful DoFn with a stale timer, an
>>>>>>> initial timestamp state, and a counter state, along with a buffer of
>>>>>>> elements to bundle. When the counter or timer exceeds max values,
>>>>>>> outputWithTimestamp().
>>>>>>>
>>>>>>> I'm happy to post the entire implementation somewhere, not sure
>>>>>>> about etiquette and how this mailing list handles attachments.
>>>>>>>
>>>>>>> Jacob
>>>>>>>
>>>>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Have you considered using a stateful DoFn, buffering/batching based
>>>>>>>> upon a certain number of elements is shown in this blog[1] and could be
>>>>>>>> extended for your usecase.
>>>>>>>>
>>>>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>>>>>
>>>>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> My first streaming pipeline is pretty simple, it just pipes a
>>>>>>>>> queue into files:
>>>>>>>>>
>>>>>>>>> - read JSON objects from PubsubIO
>>>>>>>>> - event time = processing time
>>>>>>>>> - 5 minute windows (
>>>>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>>>>>>
>>>>>>>>> When the pipeline gets behind (for example, when the pipeline is
>>>>>>>>> stopped for an hour and restarted) this creates problems because the 
>>>>>>>>> amount
>>>>>>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>>>>>>
>>>>>>>>> I believe that what is needed is a new step, just before "write to
>>>>>>>>> GCS":
>>>>>>>>>
>>>>>>>>> - split/partition/window into ceil(totalElements / maxElements)
>>>>>>>>> groups
>>>>>>>>>
>>>>>>>>> My next idea is to implement my own Partition and PartitionDoFn
>>>>>>>>> that accept a PCollectionView<Long> from Count.perElemen().
>>>>>>>>>
>>>>>>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>>>>>>> element quantity?
>>>>>>>>>
>>>>>>>>> Jacob
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to