Feel free to share it with an online paste or a link to a github repo
containing the code.

Other users may be interested in your solution.

On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <[email protected]> wrote:

> Lukasz-
>
> That worked. I created a stateful DoFn with a stale timer, an initial
> timestamp state, and a counter state, along with a buffer of elements to
> bundle. When the counter or timer exceeds max values, outputWithTimestamp().
>
> I'm happy to post the entire implementation somewhere, not sure about
> etiquette and how this mailing list handles attachments.
>
> Jacob
>
> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <[email protected]> wrote:
>
>> Have you considered using a stateful DoFn, buffering/batching based upon
>> a certain number of elements is shown in this blog[1] and could be extended
>> for your usecase.
>>
>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <[email protected]>
>> wrote:
>>
>>> My first streaming pipeline is pretty simple, it just pipes a queue into
>>> files:
>>>
>>> - read JSON objects from PubsubIO
>>> - event time = processing time
>>> - 5 minute windows (
>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>
>>> When the pipeline gets behind (for example, when the pipeline is stopped
>>> for an hour and restarted) this creates problems because the amount of data
>>> per file becomes too much, and the pipeline stays behind.
>>>
>>> I believe that what is needed is a new step, just before "write to GCS":
>>>
>>> - split/partition/window into ceil(totalElements / maxElements) groups
>>>
>>> My next idea is to implement my own Partition and PartitionDoFn that
>>> accept a PCollectionView<Long> from Count.perElemen().
>>>
>>> Is there a more built-in way to accomplish dynamic partitions by element
>>> quantity?
>>>
>>> Jacob
>>>
>>
>>
>

Reply via email to