Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Robert Bradshaw
The direct runner blows up memory usage as well.

Currently, the size limitations and performance characteristics of BagState
are a property of the runner. I don't know if this kind of thing can or
should be lifted to the model itself, though it might make sense to provide
expectations and recommendations to be a "performant" runner.

I think the comment about regressing is specific to the Dataflow runner, in
that it might get better, but we won't make it worse.

On Tue, Feb 28, 2017 at 1:32 PM, Beau Fabry  wrote:

> Could I get some clarification on "But the property of being able to
> exceed memory won't be regressed, whatever strategy is adopted.". It seems
> like both Dataflow batch mode, and Flink all modes currently could blow
> memory usage if you attempt to read a large BagState, so should I take that
> to mean that a design that assumes a BagState can be arbitrarily large is
> inappropriate?
>
> On Tue, Feb 28, 2017 at 1:21 PM Kenneth Knowles  wrote:
>
>> Yes, currently the Dataflow runner does read BagState lazily in streaming
>> mode. In batch mode it uses in-memory state at the moment.
>>
>> But I want to re-emphasize that the APIs are designed to allow many
>> possible implementations and styles of optimization, so it can (and likely
>> will) change over time. But the property of being able to exceed memory
>> won't be regressed, whatever strategy is adopted.
>>
>> Kenn
>>
>> On Tue, Feb 28, 2017 at 12:39 PM, Aljoscha Krettek 
>> wrote:
>>
>> Just to confirm: yes, the Flink Runner does (currently) not read lazily.
>>
>> On Tue, 28 Feb 2017 at 21:26 Beau Fabry  wrote:
>>
>> Ok, this might be out of the scope of the mailing list, but do you know
>> if the dataflow runner specifically reads bagstate lazily? Our read of the
>> flink runner code is that it does not.
>>
>> On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, 
>> wrote:
>>
>> Yes, the intent is that runners can (should?) support streaming of
>> BagState reads. Of course reading a large state every time could still be
>> expensive... (One of the key features of BagState is that it supports blind
>> writes, i.e. you can append cheaply without reading).
>>
>>
>> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry  wrote:
>>
>> Awesome, thanks for the answers. I have one more: Is the Iterable
>> returned by BagState backed by some sort of lazy-loading from disk, like
>> the Iterable result in a GroupByKey, as mentioned here
>> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-
>> 1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487 ?
>>
>> We're joining some infrequently changing small amount of data (per-key,
>> but still far too big globally to fit in memory as a sideinput) to a very
>> large frequently updated stream and trying to figure out how to reduce our
>> redundant outputs.
>>
>> Cheers,
>> Beau
>>
>> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:
>>
>> Hi Beau,
>>
>> I've answered inline.
>>
>> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>>
>> In times when communicating between outputs would be an optimisation, and
>> reducing inputs in parallel would be an optimisation, how do you make the
>> call between using a stateful dofn or a Combine.perKey?
>>
>>
>> (Standard optimization caveat: you'd make the call by measuring them both)
>>
>> Usually you will be forced to make this decision not based on
>> optimization, but based on whether your use case can be expressed naturally
>> as one or the other.
>>
>> The key difference is just the one I talk about in the blog post, the
>> associative `merge` operation. This enables nice optimizations so if your
>> use case fits nicely as a CombineFn then you should probably use it. There
>> are exceptions, like fine-grained access to just a piece of your
>> accumulator state, but the most common way to combine things is with
>> Combine :-)
>>
>> You can always follow a Combine.perKey with a stateful ParDo that
>> communicates between outputs, if that also presents a major savings.
>>
>> Is there a case to be made that CombineFn could also get access to state
>> within extractOutput? It seems like the only benefit to a CombineFn now is
>> that the merge and addInput steps can run on multiple workers, is there a
>> rule of thumb to know if we have enough data per key that that is
>> significant?
>>
>>
>> This is a big benefit a lot of the time, especially in large batch
>> executions. Without this optimization opportunity, a Combine.perKey would
>> execute by first doing a GroupByKey - shuffling all of your data - and then
>> applying your CombineFn to each KV>. But instead, this can
>> be optimized so that the combine happens before the shuffle, which can
>> reduce the shuffled data to a single accumulator per key. This optimization
>> is general enough that my description applies to all runners, though it has
>> lesser benefits in streaming executions where there is not as much data per
>> key+window+triggering.
>>
>> Kenn
>>
>>
>>
>> Cheers,
>>

Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Beau Fabry
Could I get some clarification on "But the property of being able to exceed
memory won't be regressed, whatever strategy is adopted.". It seems like
both Dataflow batch mode, and Flink all modes currently could blow memory
usage if you attempt to read a large BagState, so should I take that to
mean that a design that assumes a BagState can be arbitrarily large is
inappropriate?

On Tue, Feb 28, 2017 at 1:21 PM Kenneth Knowles  wrote:

> Yes, currently the Dataflow runner does read BagState lazily in streaming
> mode. In batch mode it uses in-memory state at the moment.
>
> But I want to re-emphasize that the APIs are designed to allow many
> possible implementations and styles of optimization, so it can (and likely
> will) change over time. But the property of being able to exceed memory
> won't be regressed, whatever strategy is adopted.
>
> Kenn
>
> On Tue, Feb 28, 2017 at 12:39 PM, Aljoscha Krettek 
> wrote:
>
> Just to confirm: yes, the Flink Runner does (currently) not read lazily.
>
> On Tue, 28 Feb 2017 at 21:26 Beau Fabry  wrote:
>
> Ok, this might be out of the scope of the mailing list, but do you know if
> the dataflow runner specifically reads bagstate lazily? Our read of the
> flink runner code is that it does not.
>
> On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, 
> wrote:
>
> Yes, the intent is that runners can (should?) support streaming of
> BagState reads. Of course reading a large state every time could still be
> expensive... (One of the key features of BagState is that it supports blind
> writes, i.e. you can append cheaply without reading).
>
>
> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry  wrote:
>
> Awesome, thanks for the answers. I have one more: Is the Iterable returned
> by BagState backed by some sort of lazy-loading from disk, like the
> Iterable result in a GroupByKey, as mentioned here
> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487
>  ?
>
> We're joining some infrequently changing small amount of data (per-key,
> but still far too big globally to fit in memory as a sideinput) to a very
> large frequently updated stream and trying to figure out how to reduce our
> redundant outputs.
>
> Cheers,
> Beau
>
> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:
>
> Hi Beau,
>
> I've answered inline.
>
> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>
> In times when communicating between outputs would be an optimisation, and
> reducing inputs in parallel would be an optimisation, how do you make the
> call between using a stateful dofn or a Combine.perKey?
>
>
> (Standard optimization caveat: you'd make the call by measuring them both)
>
> Usually you will be forced to make this decision not based on
> optimization, but based on whether your use case can be expressed naturally
> as one or the other.
>
> The key difference is just the one I talk about in the blog post, the
> associative `merge` operation. This enables nice optimizations so if your
> use case fits nicely as a CombineFn then you should probably use it. There
> are exceptions, like fine-grained access to just a piece of your
> accumulator state, but the most common way to combine things is with
> Combine :-)
>
> You can always follow a Combine.perKey with a stateful ParDo that
> communicates between outputs, if that also presents a major savings.
>
> Is there a case to be made that CombineFn could also get access to state
> within extractOutput? It seems like the only benefit to a CombineFn now is
> that the merge and addInput steps can run on multiple workers, is there a
> rule of thumb to know if we have enough data per key that that is
> significant?
>
>
> This is a big benefit a lot of the time, especially in large batch
> executions. Without this optimization opportunity, a Combine.perKey would
> execute by first doing a GroupByKey - shuffling all of your data - and then
> applying your CombineFn to each KV>. But instead, this can
> be optimized so that the combine happens before the shuffle, which can
> reduce the shuffled data to a single accumulator per key. This optimization
> is general enough that my description applies to all runners, though it has
> lesser benefits in streaming executions where there is not as much data per
> key+window+triggering.
>
> Kenn
>
>
>
> Cheers,
> Beau
>
> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:
>
> Great post, I like the use of the previous figure style with geometric
> forms and colors, as well as the table analogy that really helps to
> understand the concepts. I am still digesting some of the consequences of
> the State API, in particular the implications of using state that you
> mention at the end. Really good to discuss those also as part of the post.
>
> I found some small typos and formatting issues that I addressed here.
> https://github.com/apache/beam-site/pull/156
>
> Thanks for writing,
> Ismaël
>
>
> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Bap

Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Kenneth Knowles
Yes, currently the Dataflow runner does read BagState lazily in streaming
mode. In batch mode it uses in-memory state at the moment.

But I want to re-emphasize that the APIs are designed to allow many
possible implementations and styles of optimization, so it can (and likely
will) change over time. But the property of being able to exceed memory
won't be regressed, whatever strategy is adopted.

Kenn

On Tue, Feb 28, 2017 at 12:39 PM, Aljoscha Krettek 
wrote:

> Just to confirm: yes, the Flink Runner does (currently) not read lazily.
>
> On Tue, 28 Feb 2017 at 21:26 Beau Fabry  wrote:
>
>> Ok, this might be out of the scope of the mailing list, but do you know
>> if the dataflow runner specifically reads bagstate lazily? Our read of the
>> flink runner code is that it does not.
>>
>> On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, 
>> wrote:
>>
>> Yes, the intent is that runners can (should?) support streaming of
>> BagState reads. Of course reading a large state every time could still be
>> expensive... (One of the key features of BagState is that it supports blind
>> writes, i.e. you can append cheaply without reading).
>>
>>
>> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry  wrote:
>>
>> Awesome, thanks for the answers. I have one more: Is the Iterable
>> returned by BagState backed by some sort of lazy-loading from disk, like
>> the Iterable result in a GroupByKey, as mentioned here
>> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-
>> 1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487 ?
>>
>> We're joining some infrequently changing small amount of data (per-key,
>> but still far too big globally to fit in memory as a sideinput) to a very
>> large frequently updated stream and trying to figure out how to reduce our
>> redundant outputs.
>>
>> Cheers,
>> Beau
>>
>> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:
>>
>> Hi Beau,
>>
>> I've answered inline.
>>
>> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>>
>> In times when communicating between outputs would be an optimisation, and
>> reducing inputs in parallel would be an optimisation, how do you make the
>> call between using a stateful dofn or a Combine.perKey?
>>
>>
>> (Standard optimization caveat: you'd make the call by measuring them both)
>>
>> Usually you will be forced to make this decision not based on
>> optimization, but based on whether your use case can be expressed naturally
>> as one or the other.
>>
>> The key difference is just the one I talk about in the blog post, the
>> associative `merge` operation. This enables nice optimizations so if your
>> use case fits nicely as a CombineFn then you should probably use it. There
>> are exceptions, like fine-grained access to just a piece of your
>> accumulator state, but the most common way to combine things is with
>> Combine :-)
>>
>> You can always follow a Combine.perKey with a stateful ParDo that
>> communicates between outputs, if that also presents a major savings.
>>
>> Is there a case to be made that CombineFn could also get access to state
>> within extractOutput? It seems like the only benefit to a CombineFn now is
>> that the merge and addInput steps can run on multiple workers, is there a
>> rule of thumb to know if we have enough data per key that that is
>> significant?
>>
>>
>> This is a big benefit a lot of the time, especially in large batch
>> executions. Without this optimization opportunity, a Combine.perKey would
>> execute by first doing a GroupByKey - shuffling all of your data - and then
>> applying your CombineFn to each KV>. But instead, this can
>> be optimized so that the combine happens before the shuffle, which can
>> reduce the shuffled data to a single accumulator per key. This optimization
>> is general enough that my description applies to all runners, though it has
>> lesser benefits in streaming executions where there is not as much data per
>> key+window+triggering.
>>
>> Kenn
>>
>>
>>
>> Cheers,
>> Beau
>>
>> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:
>>
>> Great post, I like the use of the previous figure style with geometric
>> forms and colors, as well as the table analogy that really helps to
>> understand the concepts. I am still digesting some of the consequences of
>> the State API, in particular the implications of using state that you
>> mention at the end. Really good to discuss those also as part of the post.
>>
>> I found some small typos and formatting issues that I addressed here.
>> https://github.com/apache/beam-site/pull/156
>>
>> Thanks for writing,
>> Ismaël
>>
>>
>> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré 
>> wrote:
>>
>> Hey Ken
>>
>> Just take a quick look and it's a great post !
>>
>> Thanks
>> Regards
>> JB
>> On Feb 13, 2017, at 18:44, Kenneth Knowles  wrote:
>>
>> Hi all,
>>
>> I've just published a blog post about Beam's new stateful processing
>> capabilities:
>>
>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>
>> T

Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Aljoscha Krettek
Just to confirm: yes, the Flink Runner does (currently) not read lazily.

On Tue, 28 Feb 2017 at 21:26 Beau Fabry  wrote:

> Ok, this might be out of the scope of the mailing list, but do you know if
> the dataflow runner specifically reads bagstate lazily? Our read of the
> flink runner code is that it does not.
>
> On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, 
> wrote:
>
> Yes, the intent is that runners can (should?) support streaming of
> BagState reads. Of course reading a large state every time could still be
> expensive... (One of the key features of BagState is that it supports blind
> writes, i.e. you can append cheaply without reading).
>
>
> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry  wrote:
>
> Awesome, thanks for the answers. I have one more: Is the Iterable returned
> by BagState backed by some sort of lazy-loading from disk, like the
> Iterable result in a GroupByKey, as mentioned here
> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487
>  ?
>
> We're joining some infrequently changing small amount of data (per-key,
> but still far too big globally to fit in memory as a sideinput) to a very
> large frequently updated stream and trying to figure out how to reduce our
> redundant outputs.
>
> Cheers,
> Beau
>
> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:
>
> Hi Beau,
>
> I've answered inline.
>
> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>
> In times when communicating between outputs would be an optimisation, and
> reducing inputs in parallel would be an optimisation, how do you make the
> call between using a stateful dofn or a Combine.perKey?
>
>
> (Standard optimization caveat: you'd make the call by measuring them both)
>
> Usually you will be forced to make this decision not based on
> optimization, but based on whether your use case can be expressed naturally
> as one or the other.
>
> The key difference is just the one I talk about in the blog post, the
> associative `merge` operation. This enables nice optimizations so if your
> use case fits nicely as a CombineFn then you should probably use it. There
> are exceptions, like fine-grained access to just a piece of your
> accumulator state, but the most common way to combine things is with
> Combine :-)
>
> You can always follow a Combine.perKey with a stateful ParDo that
> communicates between outputs, if that also presents a major savings.
>
> Is there a case to be made that CombineFn could also get access to state
> within extractOutput? It seems like the only benefit to a CombineFn now is
> that the merge and addInput steps can run on multiple workers, is there a
> rule of thumb to know if we have enough data per key that that is
> significant?
>
>
> This is a big benefit a lot of the time, especially in large batch
> executions. Without this optimization opportunity, a Combine.perKey would
> execute by first doing a GroupByKey - shuffling all of your data - and then
> applying your CombineFn to each KV>. But instead, this can
> be optimized so that the combine happens before the shuffle, which can
> reduce the shuffled data to a single accumulator per key. This optimization
> is general enough that my description applies to all runners, though it has
> lesser benefits in streaming executions where there is not as much data per
> key+window+triggering.
>
> Kenn
>
>
>
> Cheers,
> Beau
>
> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:
>
> Great post, I like the use of the previous figure style with geometric
> forms and colors, as well as the table analogy that really helps to
> understand the concepts. I am still digesting some of the consequences of
> the State API, in particular the implications of using state that you
> mention at the end. Really good to discuss those also as part of the post.
>
> I found some small typos and formatting issues that I addressed here.
> https://github.com/apache/beam-site/pull/156
>
> Thanks for writing,
> Ismaël
>
>
> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré 
> wrote:
>
> Hey Ken
>
> Just take a quick look and it's a great post !
>
> Thanks
> Regards
> JB
> On Feb 13, 2017, at 18:44, Kenneth Knowles  wrote:
>
> Hi all,
>
> I've just published a blog post about Beam's new stateful processing
> capabilities:
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>
> The blog post covers stateful processing from a few angles: how it works,
> how it fits into the Beam model, what you might use it for, and finally
> some examples of stateful Beam code.
>
> I'd love for you to take a look and see how this feature might apply to
> your use of Beam. And, of course, I'd also love to hear from you about it.
>
> Kenn
>
>
>
>


Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Beau Fabry
Ok, this might be out of the scope of the mailing list, but do you know if
the dataflow runner specifically reads bagstate lazily? Our read of the
flink runner code is that it does not.

On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, 
wrote:

> Yes, the intent is that runners can (should?) support streaming of
> BagState reads. Of course reading a large state every time could still be
> expensive... (One of the key features of BagState is that it supports blind
> writes, i.e. you can append cheaply without reading).
>
>
> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry  wrote:
>
> Awesome, thanks for the answers. I have one more: Is the Iterable returned
> by BagState backed by some sort of lazy-loading from disk, like the
> Iterable result in a GroupByKey, as mentioned here
> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487
>  ?
>
> We're joining some infrequently changing small amount of data (per-key,
> but still far too big globally to fit in memory as a sideinput) to a very
> large frequently updated stream and trying to figure out how to reduce our
> redundant outputs.
>
> Cheers,
> Beau
>
> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:
>
> Hi Beau,
>
> I've answered inline.
>
> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>
> In times when communicating between outputs would be an optimisation, and
> reducing inputs in parallel would be an optimisation, how do you make the
> call between using a stateful dofn or a Combine.perKey?
>
>
> (Standard optimization caveat: you'd make the call by measuring them both)
>
> Usually you will be forced to make this decision not based on
> optimization, but based on whether your use case can be expressed naturally
> as one or the other.
>
> The key difference is just the one I talk about in the blog post, the
> associative `merge` operation. This enables nice optimizations so if your
> use case fits nicely as a CombineFn then you should probably use it. There
> are exceptions, like fine-grained access to just a piece of your
> accumulator state, but the most common way to combine things is with
> Combine :-)
>
> You can always follow a Combine.perKey with a stateful ParDo that
> communicates between outputs, if that also presents a major savings.
>
> Is there a case to be made that CombineFn could also get access to state
> within extractOutput? It seems like the only benefit to a CombineFn now is
> that the merge and addInput steps can run on multiple workers, is there a
> rule of thumb to know if we have enough data per key that that is
> significant?
>
>
> This is a big benefit a lot of the time, especially in large batch
> executions. Without this optimization opportunity, a Combine.perKey would
> execute by first doing a GroupByKey - shuffling all of your data - and then
> applying your CombineFn to each KV>. But instead, this can
> be optimized so that the combine happens before the shuffle, which can
> reduce the shuffled data to a single accumulator per key. This optimization
> is general enough that my description applies to all runners, though it has
> lesser benefits in streaming executions where there is not as much data per
> key+window+triggering.
>
> Kenn
>
>
>
> Cheers,
> Beau
>
> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:
>
> Great post, I like the use of the previous figure style with geometric
> forms and colors, as well as the table analogy that really helps to
> understand the concepts. I am still digesting some of the consequences of
> the State API, in particular the implications of using state that you
> mention at the end. Really good to discuss those also as part of the post.
>
> I found some small typos and formatting issues that I addressed here.
> https://github.com/apache/beam-site/pull/156
>
> Thanks for writing,
> Ismaël
>
>
> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré 
> wrote:
>
> Hey Ken
>
> Just take a quick look and it's a great post !
>
> Thanks
> Regards
> JB
> On Feb 13, 2017, at 18:44, Kenneth Knowles  wrote:
>
> Hi all,
>
> I've just published a blog post about Beam's new stateful processing
> capabilities:
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>
> The blog post covers stateful processing from a few angles: how it works,
> how it fits into the Beam model, what you might use it for, and finally
> some examples of stateful Beam code.
>
> I'd love for you to take a look and see how this feature might apply to
> your use of Beam. And, of course, I'd also love to hear from you about it.
>
> Kenn
>
>
>
>


Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Robert Bradshaw
Yes, the intent is that runners can (should?) support streaming of BagState
reads. Of course reading a large state every time could still be
expensive... (One of the key features of BagState is that it supports blind
writes, i.e. you can append cheaply without reading).

On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry  wrote:

> Awesome, thanks for the answers. I have one more: Is the Iterable returned
> by BagState backed by some sort of lazy-loading from disk, like the
> Iterable result in a GroupByKey, as mentioned here
> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-
> 1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487 ?
>
> We're joining some infrequently changing small amount of data (per-key,
> but still far too big globally to fit in memory as a sideinput) to a very
> large frequently updated stream and trying to figure out how to reduce our
> redundant outputs.
>
> Cheers,
> Beau
>
> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:
>
>> Hi Beau,
>>
>> I've answered inline.
>>
>> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>>
>> In times when communicating between outputs would be an optimisation, and
>> reducing inputs in parallel would be an optimisation, how do you make the
>> call between using a stateful dofn or a Combine.perKey?
>>
>>
>> (Standard optimization caveat: you'd make the call by measuring them both)
>>
>> Usually you will be forced to make this decision not based on
>> optimization, but based on whether your use case can be expressed naturally
>> as one or the other.
>>
>> The key difference is just the one I talk about in the blog post, the
>> associative `merge` operation. This enables nice optimizations so if your
>> use case fits nicely as a CombineFn then you should probably use it. There
>> are exceptions, like fine-grained access to just a piece of your
>> accumulator state, but the most common way to combine things is with
>> Combine :-)
>>
>> You can always follow a Combine.perKey with a stateful ParDo that
>> communicates between outputs, if that also presents a major savings.
>>
>> Is there a case to be made that CombineFn could also get access to state
>> within extractOutput? It seems like the only benefit to a CombineFn now is
>> that the merge and addInput steps can run on multiple workers, is there a
>> rule of thumb to know if we have enough data per key that that is
>> significant?
>>
>>
>> This is a big benefit a lot of the time, especially in large batch
>> executions. Without this optimization opportunity, a Combine.perKey would
>> execute by first doing a GroupByKey - shuffling all of your data - and then
>> applying your CombineFn to each KV>. But instead, this can
>> be optimized so that the combine happens before the shuffle, which can
>> reduce the shuffled data to a single accumulator per key. This optimization
>> is general enough that my description applies to all runners, though it has
>> lesser benefits in streaming executions where there is not as much data per
>> key+window+triggering.
>>
>> Kenn
>>
>>
>>
>> Cheers,
>> Beau
>>
>> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:
>>
>> Great post, I like the use of the previous figure style with geometric
>> forms and colors, as well as the table analogy that really helps to
>> understand the concepts. I am still digesting some of the consequences of
>> the State API, in particular the implications of using state that you
>> mention at the end. Really good to discuss those also as part of the post.
>>
>> I found some small typos and formatting issues that I addressed here.
>> https://github.com/apache/beam-site/pull/156
>>
>> Thanks for writing,
>> Ismaël
>>
>>
>> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré 
>> wrote:
>>
>> Hey Ken
>>
>> Just take a quick look and it's a great post !
>>
>> Thanks
>> Regards
>> JB
>> On Feb 13, 2017, at 18:44, Kenneth Knowles  wrote:
>>
>> Hi all,
>>
>> I've just published a blog post about Beam's new stateful processing
>> capabilities:
>>
>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>
>> The blog post covers stateful processing from a few angles: how it
>> works, how it fits into the Beam model, what you might use it for, and
>> finally some examples of stateful Beam code.
>>
>> I'd love for you to take a look and see how this feature might apply to
>> your use of Beam. And, of course, I'd also love to hear from you about it.
>>
>> Kenn
>>
>>
>>


Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Beau Fabry
Awesome, thanks for the answers. I have one more: Is the Iterable returned
by BagState backed by some sort of lazy-loading from disk, like the
Iterable result in a GroupByKey, as mentioned here
http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487
 ?

We're joining some infrequently changing small amount of data (per-key, but
still far too big globally to fit in memory as a sideinput) to a very large
frequently updated stream and trying to figure out how to reduce our
redundant outputs.

Cheers,
Beau

On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:

> Hi Beau,
>
> I've answered inline.
>
> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>
> In times when communicating between outputs would be an optimisation, and
> reducing inputs in parallel would be an optimisation, how do you make the
> call between using a stateful dofn or a Combine.perKey?
>
>
> (Standard optimization caveat: you'd make the call by measuring them both)
>
> Usually you will be forced to make this decision not based on
> optimization, but based on whether your use case can be expressed naturally
> as one or the other.
>
> The key difference is just the one I talk about in the blog post, the
> associative `merge` operation. This enables nice optimizations so if your
> use case fits nicely as a CombineFn then you should probably use it. There
> are exceptions, like fine-grained access to just a piece of your
> accumulator state, but the most common way to combine things is with
> Combine :-)
>
> You can always follow a Combine.perKey with a stateful ParDo that
> communicates between outputs, if that also presents a major savings.
>
> Is there a case to be made that CombineFn could also get access to state
> within extractOutput? It seems like the only benefit to a CombineFn now is
> that the merge and addInput steps can run on multiple workers, is there a
> rule of thumb to know if we have enough data per key that that is
> significant?
>
>
> This is a big benefit a lot of the time, especially in large batch
> executions. Without this optimization opportunity, a Combine.perKey would
> execute by first doing a GroupByKey - shuffling all of your data - and then
> applying your CombineFn to each KV>. But instead, this can
> be optimized so that the combine happens before the shuffle, which can
> reduce the shuffled data to a single accumulator per key. This optimization
> is general enough that my description applies to all runners, though it has
> lesser benefits in streaming executions where there is not as much data per
> key+window+triggering.
>
> Kenn
>
>
>
> Cheers,
> Beau
>
> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:
>
> Great post, I like the use of the previous figure style with geometric
> forms and colors, as well as the table analogy that really helps to
> understand the concepts. I am still digesting some of the consequences of
> the State API, in particular the implications of using state that you
> mention at the end. Really good to discuss those also as part of the post.
>
> I found some small typos and formatting issues that I addressed here.
> https://github.com/apache/beam-site/pull/156
>
> Thanks for writing,
> Ismaël
>
>
> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré 
> wrote:
>
> Hey Ken
>
> Just take a quick look and it's a great post !
>
> Thanks
> Regards
> JB
> On Feb 13, 2017, at 18:44, Kenneth Knowles  wrote:
>
> Hi all,
>
> I've just published a blog post about Beam's new stateful processing
> capabilities:
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>
> The blog post covers stateful processing from a few angles: how it works,
> how it fits into the Beam model, what you might use it for, and finally
> some examples of stateful Beam code.
>
> I'd love for you to take a look and see how this feature might apply to
> your use of Beam. And, of course, I'd also love to hear from you about it.
>
> Kenn
>
>
>


Re: New blog post: "Stateful processing with Apache Beam"

2017-02-21 Thread Kenneth Knowles
Hi Beau,

I've answered inline.

On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>
> In times when communicating between outputs would be an optimisation, and
> reducing inputs in parallel would be an optimisation, how do you make the
> call between using a stateful dofn or a Combine.perKey?
>

(Standard optimization caveat: you'd make the call by measuring them both)

Usually you will be forced to make this decision not based on optimization,
but based on whether your use case can be expressed naturally as one or the
other.

The key difference is just the one I talk about in the blog post, the
associative `merge` operation. This enables nice optimizations so if your
use case fits nicely as a CombineFn then you should probably use it. There
are exceptions, like fine-grained access to just a piece of your
accumulator state, but the most common way to combine things is with
Combine :-)

You can always follow a Combine.perKey with a stateful ParDo that
communicates between outputs, if that also presents a major savings.

Is there a case to be made that CombineFn could also get access to state
> within extractOutput? It seems like the only benefit to a CombineFn now is
> that the merge and addInput steps can run on multiple workers, is there a
> rule of thumb to know if we have enough data per key that that is
> significant?
>

This is a big benefit a lot of the time, especially in large batch
executions. Without this optimization opportunity, a Combine.perKey would
execute by first doing a GroupByKey - shuffling all of your data - and then
applying your CombineFn to each KV>. But instead, this can
be optimized so that the combine happens before the shuffle, which can
reduce the shuffled data to a single accumulator per key. This optimization
is general enough that my description applies to all runners, though it has
lesser benefits in streaming executions where there is not as much data per
key+window+triggering.

Kenn


>
> Cheers,
> Beau
>
> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:
>
>> Great post, I like the use of the previous figure style with geometric
>> forms and colors, as well as the table analogy that really helps to
>> understand the concepts. I am still digesting some of the consequences of
>> the State API, in particular the implications of using state that you
>> mention at the end. Really good to discuss those also as part of the post.
>>
>> I found some small typos and formatting issues that I addressed here.
>> https://github.com/apache/beam-site/pull/156
>>
>> Thanks for writing,
>> Ismaël
>>
>>
>> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré 
>> wrote:
>>
>> Hey Ken
>>
>> Just take a quick look and it's a great post !
>>
>> Thanks
>> Regards
>> JB
>> On Feb 13, 2017, at 18:44, Kenneth Knowles  wrote:
>>
>> Hi all,
>>
>> I've just published a blog post about Beam's new stateful processing
>> capabilities:
>>
>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>
>> The blog post covers stateful processing from a few angles: how it
>> works, how it fits into the Beam model, what you might use it for, and
>> finally some examples of stateful Beam code.
>>
>> I'd love for you to take a look and see how this feature might apply to
>> your use of Beam. And, of course, I'd also love to hear from you about it.
>>
>> Kenn
>>
>>
>>


Re: New blog post: "Stateful processing with Apache Beam"

2017-02-21 Thread Beau Fabry
Can someone just confirm my understanding of when to use the various KV
combination methods:

  * GroupByKey/CoGroupByKey -- If there's no incremental reduction of your
inputs possible, and no useful information can be communicated between
outputs
  * Combine.perKey -- use if you can reduce your inputs in parallel, but no
useful information can be communicated between extractOutput calls
  * Stateful DoFn -- use if you cannot reduce your inputs in parallel, and
need to communicate information between outputs

In times when communicating between outputs would be an optimisation, and
reducing inputs in parallel would be an optimisation, how do you make the
call between using a stateful dofn or a Combine.perKey? Is there a case to
be made that CombineFn could also get access to state within extractOutput?
It seems like the only benefit to a CombineFn now is that the merge and
addInput steps can run on multiple workers, is there a rule of thumb to
know if we have enough data per key that that is significant?

Cheers,
Beau

On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:

> Great post, I like the use of the previous figure style with geometric
> forms and colors, as well as the table analogy that really helps to
> understand the concepts. I am still digesting some of the consequences of
> the State API, in particular the implications of using state that you
> mention at the end. Really good to discuss those also as part of the post.
>
> I found some small typos and formatting issues that I addressed here.
> https://github.com/apache/beam-site/pull/156
> 
>
> Thanks for writing,
> Ismaël
>
>
> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré 
> wrote:
>
> Hey Ken
>
> Just take a quick look and it's a great post !
>
> Thanks
> Regards
> JB
> On Feb 13, 2017, at 18:44, Kenneth Knowles  wrote:
>
> Hi all,
>
> I've just published a blog post about Beam's new stateful processing
> capabilities:
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
> 
>
> The blog post covers stateful processing from a few angles: how it works,
> how it fits into the Beam model, what you might use it for, and finally
> some examples of stateful Beam code.
>
> I'd love for you to take a look and see how this feature might apply to
> your use of Beam. And, of course, I'd also love to hear from you about it.
>
> Kenn
>
>
>


Re: New blog post: "Stateful processing with Apache Beam"

2017-02-15 Thread Ismaël Mejía
Great post, I like the use of the previous figure style with geometric
forms and colors, as well as the table analogy that really helps to
understand the concepts. I am still digesting some of the consequences of
the State API, in particular the implications of using state that you
mention at the end. Really good to discuss those also as part of the post.

I found some small typos and formatting issues that I addressed here.
https://github.com/apache/beam-site/pull/156

Thanks for writing,
Ismaël


On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré 
wrote:

> Hey Ken
>
> Just take a quick look and it's a great post !
>
> Thanks
> Regards
> JB
> On Feb 13, 2017, at 18:44, Kenneth Knowles  wrote:
>>
>> Hi all,
>>
>> I've just published a blog post about Beam's new stateful processing
>> capabilities:
>>
>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>
>> The blog post covers stateful processing from a few angles: how it
>> works, how it fits into the Beam model, what you might use it for, and
>> finally some examples of stateful Beam code.
>>
>> I'd love for you to take a look and see how this feature might apply to
>> your use of Beam. And, of course, I'd also love to hear from you about it.
>>
>> Kenn
>>
>


Re: New blog post: "Stateful processing with Apache Beam"

2017-02-14 Thread Jean-Baptiste Onofré
Hey Ken

Just take a quick look and it's a great post !

Thanks
Regards
JB

On Feb 13, 2017, 18:44, at 18:44, Kenneth Knowles  wrote:
>Hi all,
>
>I've just published a blog post about Beam's new stateful processing
>capabilities:
>
>https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>
>The blog post covers stateful processing from a few angles: how it
>works,
>how it fits into the Beam model, what you might use it for, and finally
>some examples of stateful Beam code.
>
>I'd love for you to take a look and see how this feature might apply to
>your use of Beam. And, of course, I'd also love to hear from you about
>it.
>
>Kenn


Re: New blog post: "Stateful processing with Apache Beam"

2017-02-14 Thread Aljoscha Krettek
Very nice post! I especially like how concise yet explicit the DoFn API is
when it comes to state and timers.

On Mon, 13 Feb 2017 at 23:44 Kenneth Knowles  wrote:

> Hi all,
>
> I've just published a blog post about Beam's new stateful processing
> capabilities:
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>
> The blog post covers stateful processing from a few angles: how it works,
> how it fits into the Beam model, what you might use it for, and finally
> some examples of stateful Beam code.
>
> I'd love for you to take a look and see how this feature might apply to
> your use of Beam. And, of course, I'd also love to hear from you about it.
>
> Kenn
>


New blog post: "Stateful processing with Apache Beam"

2017-02-13 Thread Kenneth Knowles
Hi all,

I've just published a blog post about Beam's new stateful processing
capabilities:

https://beam.apache.org/blog/2017/02/13/stateful-processing.html

The blog post covers stateful processing from a few angles: how it works,
how it fits into the Beam model, what you might use it for, and finally
some examples of stateful Beam code.

I'd love for you to take a look and see how this feature might apply to
your use of Beam. And, of course, I'd also love to hear from you about it.

Kenn