Thomas, I reworked using the GroupIntoBatches PTransform, and things
working great (with fewer lines of code).

Thanks

Jacob

On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble <jmar...@kochava.com> wrote:

> That gist isn't working right now, but I'll update it when I find the bug.
>
> The direct runner grows memory, but never writes files.
> The dataflow runner writes temp files, but FinalizeGroupByKey never moves
> them to the final destination.
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble <jmar...@kochava.com>
> wrote:
>
>> Consider multiple instances of a DoFn:
>>
>>     @ProcessElement
>>     public void window(ProcessContext context,
>>         @StateId("count") ValueState<Integer> countState) {
>>
>>     int count = MoreObjects.firstNonNull(countState.read(), 0);
>>     count += 1;
>>     countState.write(count);
>>
>> If two instances read countState, then write countState, will countState
>> not be incremented by 1, but not by 2?
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> What do you mean by non-atomic?
>>>
>>> All output/state changes/timers from a process bundle are an all or
>>> nothing change. So if processing a bundle fails, any state changes are
>>> discarded and the state is reset to what it was before the bundle was
>>> processed.
>>>
>>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <jmar...@kochava.com>
>>> wrote:
>>>
>>>> Here's a gist: https://gist.github.com/jacobm
>>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>>
>>>> Should I consider StateId value mutations to be non-atomic?
>>>>
>>>> Jacob
>>>>
>>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Feel free to share it with an online paste or a link to a github repo
>>>>> containing the code.
>>>>>
>>>>> Other users may be interested in your solution.
>>>>>
>>>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <jmar...@kochava.com>
>>>>> wrote:
>>>>>
>>>>>> Lukasz-
>>>>>>
>>>>>> That worked. I created a stateful DoFn with a stale timer, an initial
>>>>>> timestamp state, and a counter state, along with a buffer of elements to
>>>>>> bundle. When the counter or timer exceeds max values, 
>>>>>> outputWithTimestamp().
>>>>>>
>>>>>> I'm happy to post the entire implementation somewhere, not sure about
>>>>>> etiquette and how this mailing list handles attachments.
>>>>>>
>>>>>> Jacob
>>>>>>
>>>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Have you considered using a stateful DoFn, buffering/batching based
>>>>>>> upon a certain number of elements is shown in this blog[1] and could be
>>>>>>> extended for your usecase.
>>>>>>>
>>>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>>>>
>>>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>>>>>>> into files:
>>>>>>>>
>>>>>>>> - read JSON objects from PubsubIO
>>>>>>>> - event time = processing time
>>>>>>>> - 5 minute windows (
>>>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>>>>>
>>>>>>>> When the pipeline gets behind (for example, when the pipeline is
>>>>>>>> stopped for an hour and restarted) this creates problems because the 
>>>>>>>> amount
>>>>>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>>>>>
>>>>>>>> I believe that what is needed is a new step, just before "write to
>>>>>>>> GCS":
>>>>>>>>
>>>>>>>> - split/partition/window into ceil(totalElements / maxElements)
>>>>>>>> groups
>>>>>>>>
>>>>>>>> My next idea is to implement my own Partition and PartitionDoFn
>>>>>>>> that accept a PCollectionView<Long> from Count.perElemen().
>>>>>>>>
>>>>>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>>>>>> element quantity?
>>>>>>>>
>>>>>>>> Jacob
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to