Here's a gist:
https://gist.github.com/jacobmarble/6ca40e0a14828e6a0dfe89b9cb2e4b4c

Should I consider StateId value mutations to be non-atomic?

Jacob

On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <[email protected]> wrote:

> Feel free to share it with an online paste or a link to a github repo
> containing the code.
>
> Other users may be interested in your solution.
>
> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <[email protected]> wrote:
>
>> Lukasz-
>>
>> That worked. I created a stateful DoFn with a stale timer, an initial
>> timestamp state, and a counter state, along with a buffer of elements to
>> bundle. When the counter or timer exceeds max values, outputWithTimestamp().
>>
>> I'm happy to post the entire implementation somewhere, not sure about
>> etiquette and how this mailing list handles attachments.
>>
>> Jacob
>>
>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <[email protected]> wrote:
>>
>>> Have you considered using a stateful DoFn, buffering/batching based upon
>>> a certain number of elements is shown in this blog[1] and could be extended
>>> for your usecase.
>>>
>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>
>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <[email protected]>
>>> wrote:
>>>
>>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>>> into files:
>>>>
>>>> - read JSON objects from PubsubIO
>>>> - event time = processing time
>>>> - 5 minute windows (
>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>
>>>> When the pipeline gets behind (for example, when the pipeline is
>>>> stopped for an hour and restarted) this creates problems because the amount
>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>
>>>> I believe that what is needed is a new step, just before "write to GCS":
>>>>
>>>> - split/partition/window into ceil(totalElements / maxElements) groups
>>>>
>>>> My next idea is to implement my own Partition and PartitionDoFn that
>>>> accept a PCollectionView<Long> from Count.perElemen().
>>>>
>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>> element quantity?
>>>>
>>>> Jacob
>>>>
>>>
>>>
>>
>

Reply via email to