What do you mean by non-atomic?

All output/state changes/timers from a process bundle are an all or nothing
change. So if processing a bundle fails, any state changes are discarded
and the state is reset to what it was before the bundle was processed.

On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <[email protected]> wrote:

> Here's a gist: https://gist.github.com/jacobmarble/
> 6ca40e0a14828e6a0dfe89b9cb2e4b4c
>
> Should I consider StateId value mutations to be non-atomic?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <[email protected]> wrote:
>
>> Feel free to share it with an online paste or a link to a github repo
>> containing the code.
>>
>> Other users may be interested in your solution.
>>
>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <[email protected]>
>> wrote:
>>
>>> Lukasz-
>>>
>>> That worked. I created a stateful DoFn with a stale timer, an initial
>>> timestamp state, and a counter state, along with a buffer of elements to
>>> bundle. When the counter or timer exceeds max values, outputWithTimestamp().
>>>
>>> I'm happy to post the entire implementation somewhere, not sure about
>>> etiquette and how this mailing list handles attachments.
>>>
>>> Jacob
>>>
>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <[email protected]> wrote:
>>>
>>>> Have you considered using a stateful DoFn, buffering/batching based
>>>> upon a certain number of elements is shown in this blog[1] and could be
>>>> extended for your usecase.
>>>>
>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>
>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <[email protected]>
>>>> wrote:
>>>>
>>>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>>>> into files:
>>>>>
>>>>> - read JSON objects from PubsubIO
>>>>> - event time = processing time
>>>>> - 5 minute windows (
>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>>
>>>>> When the pipeline gets behind (for example, when the pipeline is
>>>>> stopped for an hour and restarted) this creates problems because the 
>>>>> amount
>>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>>
>>>>> I believe that what is needed is a new step, just before "write to
>>>>> GCS":
>>>>>
>>>>> - split/partition/window into ceil(totalElements / maxElements) groups
>>>>>
>>>>> My next idea is to implement my own Partition and PartitionDoFn that
>>>>> accept a PCollectionView<Long> from Count.perElemen().
>>>>>
>>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>>> element quantity?
>>>>>
>>>>> Jacob
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to