That gist isn't working right now, but I'll update it when I find the bug.

The direct runner grows memory, but never writes files.
The dataflow runner writes temp files, but FinalizeGroupByKey never moves
them to the final destination.

Jacob

On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble <[email protected]> wrote:

> Consider multiple instances of a DoFn:
>
>     @ProcessElement
>     public void window(ProcessContext context,
>         @StateId("count") ValueState<Integer> countState) {
>
>     int count = MoreObjects.firstNonNull(countState.read(), 0);
>     count += 1;
>     countState.write(count);
>
> If two instances read countState, then write countState, will countState
> not be incremented by 1, but not by 2?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <[email protected]> wrote:
>
>> What do you mean by non-atomic?
>>
>> All output/state changes/timers from a process bundle are an all or
>> nothing change. So if processing a bundle fails, any state changes are
>> discarded and the state is reset to what it was before the bundle was
>> processed.
>>
>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <[email protected]>
>> wrote:
>>
>>> Here's a gist: https://gist.github.com/jacobm
>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>
>>> Should I consider StateId value mutations to be non-atomic?
>>>
>>> Jacob
>>>
>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <[email protected]> wrote:
>>>
>>>> Feel free to share it with an online paste or a link to a github repo
>>>> containing the code.
>>>>
>>>> Other users may be interested in your solution.
>>>>
>>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <[email protected]>
>>>> wrote:
>>>>
>>>>> Lukasz-
>>>>>
>>>>> That worked. I created a stateful DoFn with a stale timer, an initial
>>>>> timestamp state, and a counter state, along with a buffer of elements to
>>>>> bundle. When the counter or timer exceeds max values, 
>>>>> outputWithTimestamp().
>>>>>
>>>>> I'm happy to post the entire implementation somewhere, not sure about
>>>>> etiquette and how this mailing list handles attachments.
>>>>>
>>>>> Jacob
>>>>>
>>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> Have you considered using a stateful DoFn, buffering/batching based
>>>>>> upon a certain number of elements is shown in this blog[1] and could be
>>>>>> extended for your usecase.
>>>>>>
>>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>>>
>>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>>>>>> into files:
>>>>>>>
>>>>>>> - read JSON objects from PubsubIO
>>>>>>> - event time = processing time
>>>>>>> - 5 minute windows (
>>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>>>>
>>>>>>> When the pipeline gets behind (for example, when the pipeline is
>>>>>>> stopped for an hour and restarted) this creates problems because the 
>>>>>>> amount
>>>>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>>>>
>>>>>>> I believe that what is needed is a new step, just before "write to
>>>>>>> GCS":
>>>>>>>
>>>>>>> - split/partition/window into ceil(totalElements / maxElements)
>>>>>>> groups
>>>>>>>
>>>>>>> My next idea is to implement my own Partition and PartitionDoFn that
>>>>>>> accept a PCollectionView<Long> from Count.perElemen().
>>>>>>>
>>>>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>>>>> element quantity?
>>>>>>>
>>>>>>> Jacob
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to