Lukasz-

That worked. I created a stateful DoFn with a stale timer, an initial
timestamp state, and a counter state, along with a buffer of elements to
bundle. When the counter or timer exceeds max values, outputWithTimestamp().

I'm happy to post the entire implementation somewhere, not sure about
etiquette and how this mailing list handles attachments.

Jacob

On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <[email protected]> wrote:

> Have you considered using a stateful DoFn, buffering/batching based upon a
> certain number of elements is shown in this blog[1] and could be extended
> for your usecase.
>
> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <[email protected]> wrote:
>
>> My first streaming pipeline is pretty simple, it just pipes a queue into
>> files:
>>
>> - read JSON objects from PubsubIO
>> - event time = processing time
>> - 5 minute windows (
>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>
>> When the pipeline gets behind (for example, when the pipeline is stopped
>> for an hour and restarted) this creates problems because the amount of data
>> per file becomes too much, and the pipeline stays behind.
>>
>> I believe that what is needed is a new step, just before "write to GCS":
>>
>> - split/partition/window into ceil(totalElements / maxElements) groups
>>
>> My next idea is to implement my own Partition and PartitionDoFn that
>> accept a PCollectionView<Long> from Count.perElemen().
>>
>> Is there a more built-in way to accomplish dynamic partitions by element
>> quantity?
>>
>> Jacob
>>
>
>

Reply via email to