Re: How to window by quantity of data?

2017-10-20 Thread Jacob Marble
Final, working version is in the original gist:
https://gist.github.com/jacobmarble/6ca40e0a14828e6a0dfe89b9cb2e4b4c

The result is heavily inspired by GroupIntoBatches, and doesn't use
windowing.

Jacob

On Wed, Oct 18, 2017 at 2:49 PM, Jacob Marble  wrote:

> Thomas, I reworked using the GroupIntoBatches PTransform, and things
> working great (with fewer lines of code).
>
> Thanks
>
> Jacob
>
> On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble  wrote:
>
>> That gist isn't working right now, but I'll update it when I find the bug.
>>
>> The direct runner grows memory, but never writes files.
>> The dataflow runner writes temp files, but FinalizeGroupByKey never moves
>> them to the final destination.
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble 
>> wrote:
>>
>>> Consider multiple instances of a DoFn:
>>>
>>> @ProcessElement
>>> public void window(ProcessContext context,
>>> @StateId("count") ValueState countState) {
>>>
>>> int count = MoreObjects.firstNonNull(countState.read(), 0);
>>> count += 1;
>>> countState.write(count);
>>>
>>> If two instances read countState, then write countState, will countState
>>> not be incremented by 1, but not by 2?
>>>
>>> Jacob
>>>
>>> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik  wrote:
>>>
 What do you mean by non-atomic?

 All output/state changes/timers from a process bundle are an all or
 nothing change. So if processing a bundle fails, any state changes are
 discarded and the state is reset to what it was before the bundle was
 processed.

 On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble 
 wrote:

> Here's a gist: https://gist.github.com/jacobm
> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>
> Should I consider StateId value mutations to be non-atomic?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:
>
>> Feel free to share it with an online paste or a link to a github repo
>> containing the code.
>>
>> Other users may be interested in your solution.
>>
>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
>> wrote:
>>
>>> Lukasz-
>>>
>>> That worked. I created a stateful DoFn with a stale timer, an
>>> initial timestamp state, and a counter state, along with a buffer of
>>> elements to bundle. When the counter or timer exceeds max values,
>>> outputWithTimestamp().
>>>
>>> I'm happy to post the entire implementation somewhere, not sure
>>> about etiquette and how this mailing list handles attachments.
>>>
>>> Jacob
>>>
>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik 
>>> wrote:
>>>
 Have you considered using a stateful DoFn, buffering/batching based
 upon a certain number of elements is shown in this blog[1] and could be
 extended for your usecase.

 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html

 On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
 wrote:

> My first streaming pipeline is pretty simple, it just pipes a
> queue into files:
>
> - read JSON objects from PubsubIO
> - event time = processing time
> - 5 minute windows (
> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>
> When the pipeline gets behind (for example, when the pipeline is
> stopped for an hour and restarted) this creates problems because the 
> amount
> of data per file becomes too much, and the pipeline stays behind.
>
> I believe that what is needed is a new step, just before "write to
> GCS":
>
> - split/partition/window into ceil(totalElements / maxElements)
> groups
>
> My next idea is to implement my own Partition and PartitionDoFn
> that accept a PCollectionView from Count.perElemen().
>
> Is there a more built-in way to accomplish dynamic partitions by
> element quantity?
>
> Jacob
>


>>>
>>
>

>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
Thomas, I reworked using the GroupIntoBatches PTransform, and things
working great (with fewer lines of code).

Thanks

Jacob

On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble  wrote:

> That gist isn't working right now, but I'll update it when I find the bug.
>
> The direct runner grows memory, but never writes files.
> The dataflow runner writes temp files, but FinalizeGroupByKey never moves
> them to the final destination.
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble 
> wrote:
>
>> Consider multiple instances of a DoFn:
>>
>> @ProcessElement
>> public void window(ProcessContext context,
>> @StateId("count") ValueState countState) {
>>
>> int count = MoreObjects.firstNonNull(countState.read(), 0);
>> count += 1;
>> countState.write(count);
>>
>> If two instances read countState, then write countState, will countState
>> not be incremented by 1, but not by 2?
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik  wrote:
>>
>>> What do you mean by non-atomic?
>>>
>>> All output/state changes/timers from a process bundle are an all or
>>> nothing change. So if processing a bundle fails, any state changes are
>>> discarded and the state is reset to what it was before the bundle was
>>> processed.
>>>
>>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble 
>>> wrote:
>>>
 Here's a gist: https://gist.github.com/jacobm
 arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c

 Should I consider StateId value mutations to be non-atomic?

 Jacob

 On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:

> Feel free to share it with an online paste or a link to a github repo
> containing the code.
>
> Other users may be interested in your solution.
>
> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
> wrote:
>
>> Lukasz-
>>
>> That worked. I created a stateful DoFn with a stale timer, an initial
>> timestamp state, and a counter state, along with a buffer of elements to
>> bundle. When the counter or timer exceeds max values, 
>> outputWithTimestamp().
>>
>> I'm happy to post the entire implementation somewhere, not sure about
>> etiquette and how this mailing list handles attachments.
>>
>> Jacob
>>
>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik 
>> wrote:
>>
>>> Have you considered using a stateful DoFn, buffering/batching based
>>> upon a certain number of elements is shown in this blog[1] and could be
>>> extended for your usecase.
>>>
>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>
>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
>>> wrote:
>>>
 My first streaming pipeline is pretty simple, it just pipes a queue
 into files:

 - read JSON objects from PubsubIO
 - event time = processing time
 - 5 minute windows (
 - write n files to GCS, (TextIO.withNumShards() not dynamic)

 When the pipeline gets behind (for example, when the pipeline is
 stopped for an hour and restarted) this creates problems because the 
 amount
 of data per file becomes too much, and the pipeline stays behind.

 I believe that what is needed is a new step, just before "write to
 GCS":

 - split/partition/window into ceil(totalElements / maxElements)
 groups

 My next idea is to implement my own Partition and PartitionDoFn
 that accept a PCollectionView from Count.perElemen().

 Is there a more built-in way to accomplish dynamic partitions by
 element quantity?

 Jacob

>>>
>>>
>>
>

>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
That gist isn't working right now, but I'll update it when I find the bug.

The direct runner grows memory, but never writes files.
The dataflow runner writes temp files, but FinalizeGroupByKey never moves
them to the final destination.

Jacob

On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble  wrote:

> Consider multiple instances of a DoFn:
>
> @ProcessElement
> public void window(ProcessContext context,
> @StateId("count") ValueState countState) {
>
> int count = MoreObjects.firstNonNull(countState.read(), 0);
> count += 1;
> countState.write(count);
>
> If two instances read countState, then write countState, will countState
> not be incremented by 1, but not by 2?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik  wrote:
>
>> What do you mean by non-atomic?
>>
>> All output/state changes/timers from a process bundle are an all or
>> nothing change. So if processing a bundle fails, any state changes are
>> discarded and the state is reset to what it was before the bundle was
>> processed.
>>
>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble 
>> wrote:
>>
>>> Here's a gist: https://gist.github.com/jacobm
>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>
>>> Should I consider StateId value mutations to be non-atomic?
>>>
>>> Jacob
>>>
>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:
>>>
 Feel free to share it with an online paste or a link to a github repo
 containing the code.

 Other users may be interested in your solution.

 On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
 wrote:

> Lukasz-
>
> That worked. I created a stateful DoFn with a stale timer, an initial
> timestamp state, and a counter state, along with a buffer of elements to
> bundle. When the counter or timer exceeds max values, 
> outputWithTimestamp().
>
> I'm happy to post the entire implementation somewhere, not sure about
> etiquette and how this mailing list handles attachments.
>
> Jacob
>
> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:
>
>> Have you considered using a stateful DoFn, buffering/batching based
>> upon a certain number of elements is shown in this blog[1] and could be
>> extended for your usecase.
>>
>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
>> wrote:
>>
>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>> into files:
>>>
>>> - read JSON objects from PubsubIO
>>> - event time = processing time
>>> - 5 minute windows (
>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>
>>> When the pipeline gets behind (for example, when the pipeline is
>>> stopped for an hour and restarted) this creates problems because the 
>>> amount
>>> of data per file becomes too much, and the pipeline stays behind.
>>>
>>> I believe that what is needed is a new step, just before "write to
>>> GCS":
>>>
>>> - split/partition/window into ceil(totalElements / maxElements)
>>> groups
>>>
>>> My next idea is to implement my own Partition and PartitionDoFn that
>>> accept a PCollectionView from Count.perElemen().
>>>
>>> Is there a more built-in way to accomplish dynamic partitions by
>>> element quantity?
>>>
>>> Jacob
>>>
>>
>>
>

>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Thomas Groh
The calls are essentially atomic per-key.

More specifically, the two calls can occur in one of two ways:

1) They are for elements which share a key. If so, the calls _must_ be made
serially, so the second read() will see the result of the first write()
2) They are for elements which do not share a key. If so, the state
instances are unique and independent, so both will contain 1.

As an aside, I unfortunately missed some of this - part of the change may
be duplication of the 'GroupIntoBatches' PTransform within the Core SDK.

On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble  wrote:

> Consider multiple instances of a DoFn:
>
> @ProcessElement
> public void window(ProcessContext context,
> @StateId("count") ValueState countState) {
>
> int count = MoreObjects.firstNonNull(countState.read(), 0);
> count += 1;
> countState.write(count);
>
> If two instances read countState, then write countState, will countState
> not be incremented by 1, but not by 2?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik  wrote:
>
>> What do you mean by non-atomic?
>>
>> All output/state changes/timers from a process bundle are an all or
>> nothing change. So if processing a bundle fails, any state changes are
>> discarded and the state is reset to what it was before the bundle was
>> processed.
>>
>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble 
>> wrote:
>>
>>> Here's a gist: https://gist.github.com/jacobm
>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>
>>> Should I consider StateId value mutations to be non-atomic?
>>>
>>> Jacob
>>>
>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:
>>>
 Feel free to share it with an online paste or a link to a github repo
 containing the code.

 Other users may be interested in your solution.

 On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
 wrote:

> Lukasz-
>
> That worked. I created a stateful DoFn with a stale timer, an initial
> timestamp state, and a counter state, along with a buffer of elements to
> bundle. When the counter or timer exceeds max values, 
> outputWithTimestamp().
>
> I'm happy to post the entire implementation somewhere, not sure about
> etiquette and how this mailing list handles attachments.
>
> Jacob
>
> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:
>
>> Have you considered using a stateful DoFn, buffering/batching based
>> upon a certain number of elements is shown in this blog[1] and could be
>> extended for your usecase.
>>
>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
>> wrote:
>>
>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>> into files:
>>>
>>> - read JSON objects from PubsubIO
>>> - event time = processing time
>>> - 5 minute windows (
>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>
>>> When the pipeline gets behind (for example, when the pipeline is
>>> stopped for an hour and restarted) this creates problems because the 
>>> amount
>>> of data per file becomes too much, and the pipeline stays behind.
>>>
>>> I believe that what is needed is a new step, just before "write to
>>> GCS":
>>>
>>> - split/partition/window into ceil(totalElements / maxElements)
>>> groups
>>>
>>> My next idea is to implement my own Partition and PartitionDoFn that
>>> accept a PCollectionView from Count.perElemen().
>>>
>>> Is there a more built-in way to accomplish dynamic partitions by
>>> element quantity?
>>>
>>> Jacob
>>>
>>
>>
>

>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
Consider multiple instances of a DoFn:

@ProcessElement
public void window(ProcessContext context,
@StateId("count") ValueState countState) {

int count = MoreObjects.firstNonNull(countState.read(), 0);
count += 1;
countState.write(count);

If two instances read countState, then write countState, will countState
not be incremented by 1, but not by 2?

Jacob

On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik  wrote:

> What do you mean by non-atomic?
>
> All output/state changes/timers from a process bundle are an all or
> nothing change. So if processing a bundle fails, any state changes are
> discarded and the state is reset to what it was before the bundle was
> processed.
>
> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble 
> wrote:
>
>> Here's a gist: https://gist.github.com/jacobm
>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>
>> Should I consider StateId value mutations to be non-atomic?
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:
>>
>>> Feel free to share it with an online paste or a link to a github repo
>>> containing the code.
>>>
>>> Other users may be interested in your solution.
>>>
>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
>>> wrote:
>>>
 Lukasz-

 That worked. I created a stateful DoFn with a stale timer, an initial
 timestamp state, and a counter state, along with a buffer of elements to
 bundle. When the counter or timer exceeds max values, 
 outputWithTimestamp().

 I'm happy to post the entire implementation somewhere, not sure about
 etiquette and how this mailing list handles attachments.

 Jacob

 On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:

> Have you considered using a stateful DoFn, buffering/batching based
> upon a certain number of elements is shown in this blog[1] and could be
> extended for your usecase.
>
> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
> wrote:
>
>> My first streaming pipeline is pretty simple, it just pipes a queue
>> into files:
>>
>> - read JSON objects from PubsubIO
>> - event time = processing time
>> - 5 minute windows (
>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>
>> When the pipeline gets behind (for example, when the pipeline is
>> stopped for an hour and restarted) this creates problems because the 
>> amount
>> of data per file becomes too much, and the pipeline stays behind.
>>
>> I believe that what is needed is a new step, just before "write to
>> GCS":
>>
>> - split/partition/window into ceil(totalElements / maxElements) groups
>>
>> My next idea is to implement my own Partition and PartitionDoFn that
>> accept a PCollectionView from Count.perElemen().
>>
>> Is there a more built-in way to accomplish dynamic partitions by
>> element quantity?
>>
>> Jacob
>>
>
>

>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Lukasz Cwik
What do you mean by non-atomic?

All output/state changes/timers from a process bundle are an all or nothing
change. So if processing a bundle fails, any state changes are discarded
and the state is reset to what it was before the bundle was processed.

On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble  wrote:

> Here's a gist: https://gist.github.com/jacobmarble/
> 6ca40e0a14828e6a0dfe89b9cb2e4b4c
>
> Should I consider StateId value mutations to be non-atomic?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:
>
>> Feel free to share it with an online paste or a link to a github repo
>> containing the code.
>>
>> Other users may be interested in your solution.
>>
>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
>> wrote:
>>
>>> Lukasz-
>>>
>>> That worked. I created a stateful DoFn with a stale timer, an initial
>>> timestamp state, and a counter state, along with a buffer of elements to
>>> bundle. When the counter or timer exceeds max values, outputWithTimestamp().
>>>
>>> I'm happy to post the entire implementation somewhere, not sure about
>>> etiquette and how this mailing list handles attachments.
>>>
>>> Jacob
>>>
>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:
>>>
 Have you considered using a stateful DoFn, buffering/batching based
 upon a certain number of elements is shown in this blog[1] and could be
 extended for your usecase.

 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html

 On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
 wrote:

> My first streaming pipeline is pretty simple, it just pipes a queue
> into files:
>
> - read JSON objects from PubsubIO
> - event time = processing time
> - 5 minute windows (
> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>
> When the pipeline gets behind (for example, when the pipeline is
> stopped for an hour and restarted) this creates problems because the 
> amount
> of data per file becomes too much, and the pipeline stays behind.
>
> I believe that what is needed is a new step, just before "write to
> GCS":
>
> - split/partition/window into ceil(totalElements / maxElements) groups
>
> My next idea is to implement my own Partition and PartitionDoFn that
> accept a PCollectionView from Count.perElemen().
>
> Is there a more built-in way to accomplish dynamic partitions by
> element quantity?
>
> Jacob
>


>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Lukasz Cwik
Feel free to share it with an online paste or a link to a github repo
containing the code.

Other users may be interested in your solution.

On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble  wrote:

> Lukasz-
>
> That worked. I created a stateful DoFn with a stale timer, an initial
> timestamp state, and a counter state, along with a buffer of elements to
> bundle. When the counter or timer exceeds max values, outputWithTimestamp().
>
> I'm happy to post the entire implementation somewhere, not sure about
> etiquette and how this mailing list handles attachments.
>
> Jacob
>
> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:
>
>> Have you considered using a stateful DoFn, buffering/batching based upon
>> a certain number of elements is shown in this blog[1] and could be extended
>> for your usecase.
>>
>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
>> wrote:
>>
>>> My first streaming pipeline is pretty simple, it just pipes a queue into
>>> files:
>>>
>>> - read JSON objects from PubsubIO
>>> - event time = processing time
>>> - 5 minute windows (
>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>
>>> When the pipeline gets behind (for example, when the pipeline is stopped
>>> for an hour and restarted) this creates problems because the amount of data
>>> per file becomes too much, and the pipeline stays behind.
>>>
>>> I believe that what is needed is a new step, just before "write to GCS":
>>>
>>> - split/partition/window into ceil(totalElements / maxElements) groups
>>>
>>> My next idea is to implement my own Partition and PartitionDoFn that
>>> accept a PCollectionView from Count.perElemen().
>>>
>>> Is there a more built-in way to accomplish dynamic partitions by element
>>> quantity?
>>>
>>> Jacob
>>>
>>
>>
>


Re: How to window by quantity of data?

2017-10-17 Thread Jacob Marble
Lukasz-

That worked. I created a stateful DoFn with a stale timer, an initial
timestamp state, and a counter state, along with a buffer of elements to
bundle. When the counter or timer exceeds max values, outputWithTimestamp().

I'm happy to post the entire implementation somewhere, not sure about
etiquette and how this mailing list handles attachments.

Jacob

On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:

> Have you considered using a stateful DoFn, buffering/batching based upon a
> certain number of elements is shown in this blog[1] and could be extended
> for your usecase.
>
> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble  wrote:
>
>> My first streaming pipeline is pretty simple, it just pipes a queue into
>> files:
>>
>> - read JSON objects from PubsubIO
>> - event time = processing time
>> - 5 minute windows (
>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>
>> When the pipeline gets behind (for example, when the pipeline is stopped
>> for an hour and restarted) this creates problems because the amount of data
>> per file becomes too much, and the pipeline stays behind.
>>
>> I believe that what is needed is a new step, just before "write to GCS":
>>
>> - split/partition/window into ceil(totalElements / maxElements) groups
>>
>> My next idea is to implement my own Partition and PartitionDoFn that
>> accept a PCollectionView from Count.perElemen().
>>
>> Is there a more built-in way to accomplish dynamic partitions by element
>> quantity?
>>
>> Jacob
>>
>
>


How to window by quantity of data?

2017-10-17 Thread Jacob Marble
My first streaming pipeline is pretty simple, it just pipes a queue into
files:

- read JSON objects from PubsubIO
- event time = processing time
- 5 minute windows (
- write n files to GCS, (TextIO.withNumShards() not dynamic)

When the pipeline gets behind (for example, when the pipeline is stopped
for an hour and restarted) this creates problems because the amount of data
per file becomes too much, and the pipeline stays behind.

I believe that what is needed is a new step, just before "write to GCS":

- split/partition/window into ceil(totalElements / maxElements) groups

My next idea is to implement my own Partition and PartitionDoFn that accept
a PCollectionView from Count.perElemen().

Is there a more built-in way to accomplish dynamic partitions by element
quantity?

Jacob