Re: Stateful updating and deterministic routing

2018-05-07 Thread Ning Wang
I see. Then the doc I was reading might not be it either.

I will ask maosong then.

Thanks for the info!

On Mon, May 7, 2018 at 9:23 AM, Bill Graham  wrote:

> Yeah, that's not it. The stateful scaling part of that doc got lengthy
> enough that we broke it into a doc of it's own, per Sanjeev's suggestion
> IIRC. The fact that I can't locate it makes me think it was a twitter doc
> of mine (although it was not Twitter-specific), which I'm sure was shared
> with Sanjeev, Maosong and probably Karthik. If you can find it, please
> share.
>
> On Mon, May 7, 2018 at 12:41 AM, Ning Wang  wrote:
>
>> Thanks Karthik. The doc is not exactly the same but close enough.
>>
>> It seems my doc is an internal one so let's use your doc as reference. I
>> will see if there is any major differences and comment.
>>
>> On Sun, May 6, 2018 at 1:22 PM, Karthik Ramasamy 
>> wrote:
>>
>>> Here it is
>>>
>>> https://docs.google.com/document/d/1YDFNvLTX6Sg3WDrNFKiWLaJv
>>> uEtK4eyxEaA0w9cVlG4/edit#heading=h.d6uy2uxfs2xq
>>>
>>> cheers
>>> /karthik
>>>
>>>
>>> On Sun, May 6, 2018 at 8:20 AM, Bill Graham 
>>> wrote:
>>>
 Can you share the doc please?

 On Sat, May 5, 2018 at 4:18 PM Ning Wang  wrote:

 > Thanks.
 >
 > Yeah I have read the design doc. It has a section for scaling and
 covers
 > some designs but not reaching this level of details I am afraid.
 >
 > On Sat, May 5, 2018 at 9:45 AM, Bill Graham 
 wrote:
 >
 >> The stateful processing design included a large section on scaling,
 which
 >> was intended to be done as a future phase. It's very similar to
 what's
 >> being described. Sanjeev and I worked on it about a 1.5 years ago
 with
 >> Maosong and it was in a google doc. Sanjeev do you have that design
 doc? I
 >> can't seem locate it.
 >>
 >> On Sat, May 5, 2018 at 12:03 AM, Ning Wang 
 wrote:
 >>
 >> > If we go this way, we need key -> state map for each component so
 that
 >> the
 >> > state data can be repartitioned.
 >> >
 >> > On Fri, May 4, 2018 at 11:44 PM, Karthik Ramasamy <
 kart...@streaml.io>
 >> > wrote:
 >> >
 >> > > Instead - if it references
 >> > >
 >> > > topology name + component name + key range
 >> > >
 >> > > will it be better?
 >> > >
 >> > > cheers
 >> > > /karthik
 >> > >
 >> > >
 >> > > On Fri, May 4, 2018 at 11:23 PM, Ning Wang 
 >> wrote:
 >> > >
 >> > > > Currently I think each Instance serializes the state object
 into a
 >> byte
 >> > > > array and checkpoint manager saves the byte array into a file.
 The
 >> file
 >> > > is
 >> > > > referenced by topology name + component name + instance id.
 >> > > >
 >> > > > On Fri, May 4, 2018 at 11:10 PM, Karthik Ramasamy <
 >> kart...@streaml.io>
 >> > > > wrote:
 >> > > >
 >> > > > > I am not sure I understand why the state is tied to an
 instance?
 >> > > > >
 >> > > > > cheers
 >> > > > > /karthik
 >> > > > >
 >> > > > > On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper <
 >> > tom.n.coo...@gmail.com>
 >> > > > > wrote:
 >> > > > >
 >> > > > > > Yeah, state recovery is a bit more difficult with Heron's
 >> > > architecture.
 >> > > > > In
 >> > > > > > Storm, the task IDs are not just values used for routing
 they
 >> > > actually
 >> > > > > > equate to a task instance within the executor. An executor
 which
 >> > > > > currently
 >> > > > > > processes the keys 4-8 actually contains 5 task instances
 of the
 >> > same
 >> > > > > > component. So for each task, they just save its state
 attached
 >> to
 >> > the
 >> > > > > > single task ID and reassemble executors with the new task
 >> > instances.
 >> > > > > >
 >> > > > > > We don't want or have to do that with Heron instances but
 we
 >> would
 >> > > need
 >> > > > > to
 >> > > > > > have some way to have a state change tied to the task (or
 >> routing
 >> > key
 >> > > > if
 >> > > > > we
 >> > > > > > go to the key range idea). For something like a word count
 you
 >> > might
 >> > > > save
 >> > > > > > counts using a nested map like: { routing key : {word :
 count
 >> }}.
 >> > The
 >> > > > > > routing key could be included in the Tuple instance.
 However,
 >> > whether
 >> > > > > this
 >> > > > > > pattern would work for more generic state cases I don't
 know?
 >> > > > > >
 >> > > > > > Tom Cooper
 >> > > > > > W: www.tomcooper.org.uk  | Twitter: @tomncooper
 >> > > > > > 
 >> > > > > >
 >> > > > > >
 

Re: Stateful updating and deterministic routing

2018-05-07 Thread Bill Graham
Yeah, that's not it. The stateful scaling part of that doc got lengthy
enough that we broke it into a doc of it's own, per Sanjeev's suggestion
IIRC. The fact that I can't locate it makes me think it was a twitter doc
of mine (although it was not Twitter-specific), which I'm sure was shared
with Sanjeev, Maosong and probably Karthik. If you can find it, please
share.

On Mon, May 7, 2018 at 12:41 AM, Ning Wang  wrote:

> Thanks Karthik. The doc is not exactly the same but close enough.
>
> It seems my doc is an internal one so let's use your doc as reference. I
> will see if there is any major differences and comment.
>
> On Sun, May 6, 2018 at 1:22 PM, Karthik Ramasamy 
> wrote:
>
>> Here it is
>>
>> https://docs.google.com/document/d/1YDFNvLTX6Sg3WDrNFKiWLaJv
>> uEtK4eyxEaA0w9cVlG4/edit#heading=h.d6uy2uxfs2xq
>>
>> cheers
>> /karthik
>>
>>
>> On Sun, May 6, 2018 at 8:20 AM, Bill Graham  wrote:
>>
>>> Can you share the doc please?
>>>
>>> On Sat, May 5, 2018 at 4:18 PM Ning Wang  wrote:
>>>
>>> > Thanks.
>>> >
>>> > Yeah I have read the design doc. It has a section for scaling and
>>> covers
>>> > some designs but not reaching this level of details I am afraid.
>>> >
>>> > On Sat, May 5, 2018 at 9:45 AM, Bill Graham 
>>> wrote:
>>> >
>>> >> The stateful processing design included a large section on scaling,
>>> which
>>> >> was intended to be done as a future phase. It's very similar to what's
>>> >> being described. Sanjeev and I worked on it about a 1.5 years ago with
>>> >> Maosong and it was in a google doc. Sanjeev do you have that design
>>> doc? I
>>> >> can't seem locate it.
>>> >>
>>> >> On Sat, May 5, 2018 at 12:03 AM, Ning Wang 
>>> wrote:
>>> >>
>>> >> > If we go this way, we need key -> state map for each component so
>>> that
>>> >> the
>>> >> > state data can be repartitioned.
>>> >> >
>>> >> > On Fri, May 4, 2018 at 11:44 PM, Karthik Ramasamy <
>>> kart...@streaml.io>
>>> >> > wrote:
>>> >> >
>>> >> > > Instead - if it references
>>> >> > >
>>> >> > > topology name + component name + key range
>>> >> > >
>>> >> > > will it be better?
>>> >> > >
>>> >> > > cheers
>>> >> > > /karthik
>>> >> > >
>>> >> > >
>>> >> > > On Fri, May 4, 2018 at 11:23 PM, Ning Wang 
>>> >> wrote:
>>> >> > >
>>> >> > > > Currently I think each Instance serializes the state object
>>> into a
>>> >> byte
>>> >> > > > array and checkpoint manager saves the byte array into a file.
>>> The
>>> >> file
>>> >> > > is
>>> >> > > > referenced by topology name + component name + instance id.
>>> >> > > >
>>> >> > > > On Fri, May 4, 2018 at 11:10 PM, Karthik Ramasamy <
>>> >> kart...@streaml.io>
>>> >> > > > wrote:
>>> >> > > >
>>> >> > > > > I am not sure I understand why the state is tied to an
>>> instance?
>>> >> > > > >
>>> >> > > > > cheers
>>> >> > > > > /karthik
>>> >> > > > >
>>> >> > > > > On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper <
>>> >> > tom.n.coo...@gmail.com>
>>> >> > > > > wrote:
>>> >> > > > >
>>> >> > > > > > Yeah, state recovery is a bit more difficult with Heron's
>>> >> > > architecture.
>>> >> > > > > In
>>> >> > > > > > Storm, the task IDs are not just values used for routing
>>> they
>>> >> > > actually
>>> >> > > > > > equate to a task instance within the executor. An executor
>>> which
>>> >> > > > > currently
>>> >> > > > > > processes the keys 4-8 actually contains 5 task instances
>>> of the
>>> >> > same
>>> >> > > > > > component. So for each task, they just save its state
>>> attached
>>> >> to
>>> >> > the
>>> >> > > > > > single task ID and reassemble executors with the new task
>>> >> > instances.
>>> >> > > > > >
>>> >> > > > > > We don't want or have to do that with Heron instances but we
>>> >> would
>>> >> > > need
>>> >> > > > > to
>>> >> > > > > > have some way to have a state change tied to the task (or
>>> >> routing
>>> >> > key
>>> >> > > > if
>>> >> > > > > we
>>> >> > > > > > go to the key range idea). For something like a word count
>>> you
>>> >> > might
>>> >> > > > save
>>> >> > > > > > counts using a nested map like: { routing key : {word :
>>> count
>>> >> }}.
>>> >> > The
>>> >> > > > > > routing key could be included in the Tuple instance.
>>> However,
>>> >> > whether
>>> >> > > > > this
>>> >> > > > > > pattern would work for more generic state cases I don't
>>> know?
>>> >> > > > > >
>>> >> > > > > > Tom Cooper
>>> >> > > > > > W: www.tomcooper.org.uk  | Twitter: @tomncooper
>>> >> > > > > > 
>>> >> > > > > >
>>> >> > > > > >
>>> >> > > > > > On Fri, 4 May 2018 at 15:54, Neng Lu 
>>> >> wrote:
>>> >> > > > > >
>>> >> > > > > > > +1 for this idea. As long as the predefined key space is
>>> large
>>> >> > > > enough,
>>> >> > > > > it
>>> >> > > > > > > should work for most of the cases.
>>> >> > > > > > >
>>> >> > > > > > > Based on my experience with topologies, I 

Re: Stateful updating and deterministic routing

2018-05-07 Thread Ning Wang
Thanks Karthik. The doc is not exactly the same but close enough.

It seems my doc is an internal one so let's use your doc as reference. I
will see if there is any major differences and comment.

On Sun, May 6, 2018 at 1:22 PM, Karthik Ramasamy  wrote:

> Here it is
>
> https://docs.google.com/document/d/1YDFNvLTX6Sg3WDrNFKiWLaJvuEtK4
> eyxEaA0w9cVlG4/edit#heading=h.d6uy2uxfs2xq
>
> cheers
> /karthik
>
>
> On Sun, May 6, 2018 at 8:20 AM, Bill Graham  wrote:
>
>> Can you share the doc please?
>>
>> On Sat, May 5, 2018 at 4:18 PM Ning Wang  wrote:
>>
>> > Thanks.
>> >
>> > Yeah I have read the design doc. It has a section for scaling and covers
>> > some designs but not reaching this level of details I am afraid.
>> >
>> > On Sat, May 5, 2018 at 9:45 AM, Bill Graham 
>> wrote:
>> >
>> >> The stateful processing design included a large section on scaling,
>> which
>> >> was intended to be done as a future phase. It's very similar to what's
>> >> being described. Sanjeev and I worked on it about a 1.5 years ago with
>> >> Maosong and it was in a google doc. Sanjeev do you have that design
>> doc? I
>> >> can't seem locate it.
>> >>
>> >> On Sat, May 5, 2018 at 12:03 AM, Ning Wang 
>> wrote:
>> >>
>> >> > If we go this way, we need key -> state map for each component so
>> that
>> >> the
>> >> > state data can be repartitioned.
>> >> >
>> >> > On Fri, May 4, 2018 at 11:44 PM, Karthik Ramasamy <
>> kart...@streaml.io>
>> >> > wrote:
>> >> >
>> >> > > Instead - if it references
>> >> > >
>> >> > > topology name + component name + key range
>> >> > >
>> >> > > will it be better?
>> >> > >
>> >> > > cheers
>> >> > > /karthik
>> >> > >
>> >> > >
>> >> > > On Fri, May 4, 2018 at 11:23 PM, Ning Wang 
>> >> wrote:
>> >> > >
>> >> > > > Currently I think each Instance serializes the state object into
>> a
>> >> byte
>> >> > > > array and checkpoint manager saves the byte array into a file.
>> The
>> >> file
>> >> > > is
>> >> > > > referenced by topology name + component name + instance id.
>> >> > > >
>> >> > > > On Fri, May 4, 2018 at 11:10 PM, Karthik Ramasamy <
>> >> kart...@streaml.io>
>> >> > > > wrote:
>> >> > > >
>> >> > > > > I am not sure I understand why the state is tied to an
>> instance?
>> >> > > > >
>> >> > > > > cheers
>> >> > > > > /karthik
>> >> > > > >
>> >> > > > > On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper <
>> >> > tom.n.coo...@gmail.com>
>> >> > > > > wrote:
>> >> > > > >
>> >> > > > > > Yeah, state recovery is a bit more difficult with Heron's
>> >> > > architecture.
>> >> > > > > In
>> >> > > > > > Storm, the task IDs are not just values used for routing they
>> >> > > actually
>> >> > > > > > equate to a task instance within the executor. An executor
>> which
>> >> > > > > currently
>> >> > > > > > processes the keys 4-8 actually contains 5 task instances of
>> the
>> >> > same
>> >> > > > > > component. So for each task, they just save its state
>> attached
>> >> to
>> >> > the
>> >> > > > > > single task ID and reassemble executors with the new task
>> >> > instances.
>> >> > > > > >
>> >> > > > > > We don't want or have to do that with Heron instances but we
>> >> would
>> >> > > need
>> >> > > > > to
>> >> > > > > > have some way to have a state change tied to the task (or
>> >> routing
>> >> > key
>> >> > > > if
>> >> > > > > we
>> >> > > > > > go to the key range idea). For something like a word count
>> you
>> >> > might
>> >> > > > save
>> >> > > > > > counts using a nested map like: { routing key : {word : count
>> >> }}.
>> >> > The
>> >> > > > > > routing key could be included in the Tuple instance. However,
>> >> > whether
>> >> > > > > this
>> >> > > > > > pattern would work for more generic state cases I don't know?
>> >> > > > > >
>> >> > > > > > Tom Cooper
>> >> > > > > > W: www.tomcooper.org.uk  | Twitter: @tomncooper
>> >> > > > > > 
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > On Fri, 4 May 2018 at 15:54, Neng Lu 
>> >> wrote:
>> >> > > > > >
>> >> > > > > > > +1 for this idea. As long as the predefined key space is
>> large
>> >> > > > enough,
>> >> > > > > it
>> >> > > > > > > should work for most of the cases.
>> >> > > > > > >
>> >> > > > > > > Based on my experience with topologies, I never saw one
>> >> component
>> >> > > has
>> >> > > > > > more
>> >> > > > > > > than 1000 instances in a topology.
>> >> > > > > > >
>> >> > > > > > > For recovering states from an update, there will be some
>> >> problems
>> >> > > > > though.
>> >> > > > > > > Since the states stored in heron are strongly connected
>> with
>> >> each
>> >> > > > > > instance,
>> >> > > > > > > we either need to have
>> >> > > > > > > some resolver does the state repartitioning or stores
>> states
>> >> with
>> >> > > the
>> >> > > > > key
>> >> > > > > > > instead of with each instance.
>> >> > > > > > >
>> >> > > 

Re: Stateful updating and deterministic routing

2018-05-06 Thread Karthik Ramasamy
Here it is

https://docs.google.com/document/d/1YDFNvLTX6Sg3WDrNFKiWLaJvuEtK4eyxEaA0w9cVlG4/edit#heading=h.d6uy2uxfs2xq

cheers
/karthik


On Sun, May 6, 2018 at 8:20 AM, Bill Graham  wrote:

> Can you share the doc please?
>
> On Sat, May 5, 2018 at 4:18 PM Ning Wang  wrote:
>
> > Thanks.
> >
> > Yeah I have read the design doc. It has a section for scaling and covers
> > some designs but not reaching this level of details I am afraid.
> >
> > On Sat, May 5, 2018 at 9:45 AM, Bill Graham 
> wrote:
> >
> >> The stateful processing design included a large section on scaling,
> which
> >> was intended to be done as a future phase. It's very similar to what's
> >> being described. Sanjeev and I worked on it about a 1.5 years ago with
> >> Maosong and it was in a google doc. Sanjeev do you have that design
> doc? I
> >> can't seem locate it.
> >>
> >> On Sat, May 5, 2018 at 12:03 AM, Ning Wang 
> wrote:
> >>
> >> > If we go this way, we need key -> state map for each component so that
> >> the
> >> > state data can be repartitioned.
> >> >
> >> > On Fri, May 4, 2018 at 11:44 PM, Karthik Ramasamy  >
> >> > wrote:
> >> >
> >> > > Instead - if it references
> >> > >
> >> > > topology name + component name + key range
> >> > >
> >> > > will it be better?
> >> > >
> >> > > cheers
> >> > > /karthik
> >> > >
> >> > >
> >> > > On Fri, May 4, 2018 at 11:23 PM, Ning Wang 
> >> wrote:
> >> > >
> >> > > > Currently I think each Instance serializes the state object into a
> >> byte
> >> > > > array and checkpoint manager saves the byte array into a file. The
> >> file
> >> > > is
> >> > > > referenced by topology name + component name + instance id.
> >> > > >
> >> > > > On Fri, May 4, 2018 at 11:10 PM, Karthik Ramasamy <
> >> kart...@streaml.io>
> >> > > > wrote:
> >> > > >
> >> > > > > I am not sure I understand why the state is tied to an instance?
> >> > > > >
> >> > > > > cheers
> >> > > > > /karthik
> >> > > > >
> >> > > > > On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper <
> >> > tom.n.coo...@gmail.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Yeah, state recovery is a bit more difficult with Heron's
> >> > > architecture.
> >> > > > > In
> >> > > > > > Storm, the task IDs are not just values used for routing they
> >> > > actually
> >> > > > > > equate to a task instance within the executor. An executor
> which
> >> > > > > currently
> >> > > > > > processes the keys 4-8 actually contains 5 task instances of
> the
> >> > same
> >> > > > > > component. So for each task, they just save its state attached
> >> to
> >> > the
> >> > > > > > single task ID and reassemble executors with the new task
> >> > instances.
> >> > > > > >
> >> > > > > > We don't want or have to do that with Heron instances but we
> >> would
> >> > > need
> >> > > > > to
> >> > > > > > have some way to have a state change tied to the task (or
> >> routing
> >> > key
> >> > > > if
> >> > > > > we
> >> > > > > > go to the key range idea). For something like a word count you
> >> > might
> >> > > > save
> >> > > > > > counts using a nested map like: { routing key : {word : count
> >> }}.
> >> > The
> >> > > > > > routing key could be included in the Tuple instance. However,
> >> > whether
> >> > > > > this
> >> > > > > > pattern would work for more generic state cases I don't know?
> >> > > > > >
> >> > > > > > Tom Cooper
> >> > > > > > W: www.tomcooper.org.uk  | Twitter: @tomncooper
> >> > > > > > 
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, 4 May 2018 at 15:54, Neng Lu 
> >> wrote:
> >> > > > > >
> >> > > > > > > +1 for this idea. As long as the predefined key space is
> large
> >> > > > enough,
> >> > > > > it
> >> > > > > > > should work for most of the cases.
> >> > > > > > >
> >> > > > > > > Based on my experience with topologies, I never saw one
> >> component
> >> > > has
> >> > > > > > more
> >> > > > > > > than 1000 instances in a topology.
> >> > > > > > >
> >> > > > > > > For recovering states from an update, there will be some
> >> problems
> >> > > > > though.
> >> > > > > > > Since the states stored in heron are strongly connected with
> >> each
> >> > > > > > instance,
> >> > > > > > > we either need to have
> >> > > > > > > some resolver does the state repartitioning or stores states
> >> with
> >> > > the
> >> > > > > key
> >> > > > > > > instead of with each instance.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Fri, May 4, 2018 at 3:01 PM, Karthik Ramasamy <
> >> > > > kramas...@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Thanks for sharing. I like the Storm approach
> >> > > > > > > >
> >> > > > > > > > - keeps the implementation simpler
> >> > > > > > > > - state is deterministic across restarts
> >> > > > > > > > - makes it easy to reason and debug
> >> > > > > > > >
> >> > > > > > 

Re: Stateful updating and deterministic routing

2018-05-06 Thread Bill Graham
Can you share the doc please?

On Sat, May 5, 2018 at 4:18 PM Ning Wang  wrote:

> Thanks.
>
> Yeah I have read the design doc. It has a section for scaling and covers
> some designs but not reaching this level of details I am afraid.
>
> On Sat, May 5, 2018 at 9:45 AM, Bill Graham  wrote:
>
>> The stateful processing design included a large section on scaling, which
>> was intended to be done as a future phase. It's very similar to what's
>> being described. Sanjeev and I worked on it about a 1.5 years ago with
>> Maosong and it was in a google doc. Sanjeev do you have that design doc? I
>> can't seem locate it.
>>
>> On Sat, May 5, 2018 at 12:03 AM, Ning Wang  wrote:
>>
>> > If we go this way, we need key -> state map for each component so that
>> the
>> > state data can be repartitioned.
>> >
>> > On Fri, May 4, 2018 at 11:44 PM, Karthik Ramasamy 
>> > wrote:
>> >
>> > > Instead - if it references
>> > >
>> > > topology name + component name + key range
>> > >
>> > > will it be better?
>> > >
>> > > cheers
>> > > /karthik
>> > >
>> > >
>> > > On Fri, May 4, 2018 at 11:23 PM, Ning Wang 
>> wrote:
>> > >
>> > > > Currently I think each Instance serializes the state object into a
>> byte
>> > > > array and checkpoint manager saves the byte array into a file. The
>> file
>> > > is
>> > > > referenced by topology name + component name + instance id.
>> > > >
>> > > > On Fri, May 4, 2018 at 11:10 PM, Karthik Ramasamy <
>> kart...@streaml.io>
>> > > > wrote:
>> > > >
>> > > > > I am not sure I understand why the state is tied to an instance?
>> > > > >
>> > > > > cheers
>> > > > > /karthik
>> > > > >
>> > > > > On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper <
>> > tom.n.coo...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Yeah, state recovery is a bit more difficult with Heron's
>> > > architecture.
>> > > > > In
>> > > > > > Storm, the task IDs are not just values used for routing they
>> > > actually
>> > > > > > equate to a task instance within the executor. An executor which
>> > > > > currently
>> > > > > > processes the keys 4-8 actually contains 5 task instances of the
>> > same
>> > > > > > component. So for each task, they just save its state attached
>> to
>> > the
>> > > > > > single task ID and reassemble executors with the new task
>> > instances.
>> > > > > >
>> > > > > > We don't want or have to do that with Heron instances but we
>> would
>> > > need
>> > > > > to
>> > > > > > have some way to have a state change tied to the task (or
>> routing
>> > key
>> > > > if
>> > > > > we
>> > > > > > go to the key range idea). For something like a word count you
>> > might
>> > > > save
>> > > > > > counts using a nested map like: { routing key : {word : count
>> }}.
>> > The
>> > > > > > routing key could be included in the Tuple instance. However,
>> > whether
>> > > > > this
>> > > > > > pattern would work for more generic state cases I don't know?
>> > > > > >
>> > > > > > Tom Cooper
>> > > > > > W: www.tomcooper.org.uk  | Twitter: @tomncooper
>> > > > > > 
>> > > > > >
>> > > > > >
>> > > > > > On Fri, 4 May 2018 at 15:54, Neng Lu 
>> wrote:
>> > > > > >
>> > > > > > > +1 for this idea. As long as the predefined key space is large
>> > > > enough,
>> > > > > it
>> > > > > > > should work for most of the cases.
>> > > > > > >
>> > > > > > > Based on my experience with topologies, I never saw one
>> component
>> > > has
>> > > > > > more
>> > > > > > > than 1000 instances in a topology.
>> > > > > > >
>> > > > > > > For recovering states from an update, there will be some
>> problems
>> > > > > though.
>> > > > > > > Since the states stored in heron are strongly connected with
>> each
>> > > > > > instance,
>> > > > > > > we either need to have
>> > > > > > > some resolver does the state repartitioning or stores states
>> with
>> > > the
>> > > > > key
>> > > > > > > instead of with each instance.
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Fri, May 4, 2018 at 3:01 PM, Karthik Ramasamy <
>> > > > kramas...@gmail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Thanks for sharing. I like the Storm approach
>> > > > > > > >
>> > > > > > > > - keeps the implementation simpler
>> > > > > > > > - state is deterministic across restarts
>> > > > > > > > - makes it easy to reason and debug
>> > > > > > > >
>> > > > > > > > The hard limit is not a problem at all since most of the
>> > > topologies
>> > > > > > will
>> > > > > > > > be never that big.
>> > > > > > > > If you can handle Twitter topologies cleanly, it is more
>> that
>> > > > > > sufficient
>> > > > > > > I
>> > > > > > > > believe.
>> > > > > > > >
>> > > > > > > > cheers
>> > > > > > > > /karthik
>> > > > > > > >
>> > > > > > > > > On May 4, 2018, at 2:31 PM, Thomas Cooper <
>> > > > tom.n.coo...@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > > >

Re: Stateful updating and deterministic routing

2018-05-05 Thread Ning Wang
Thanks.

Yeah I have read the design doc. It has a section for scaling and covers
some designs but not reaching this level of details I am afraid.

On Sat, May 5, 2018 at 9:45 AM, Bill Graham  wrote:

> The stateful processing design included a large section on scaling, which
> was intended to be done as a future phase. It's very similar to what's
> being described. Sanjeev and I worked on it about a 1.5 years ago with
> Maosong and it was in a google doc. Sanjeev do you have that design doc? I
> can't seem locate it.
>
> On Sat, May 5, 2018 at 12:03 AM, Ning Wang  wrote:
>
> > If we go this way, we need key -> state map for each component so that
> the
> > state data can be repartitioned.
> >
> > On Fri, May 4, 2018 at 11:44 PM, Karthik Ramasamy 
> > wrote:
> >
> > > Instead - if it references
> > >
> > > topology name + component name + key range
> > >
> > > will it be better?
> > >
> > > cheers
> > > /karthik
> > >
> > >
> > > On Fri, May 4, 2018 at 11:23 PM, Ning Wang 
> wrote:
> > >
> > > > Currently I think each Instance serializes the state object into a
> byte
> > > > array and checkpoint manager saves the byte array into a file. The
> file
> > > is
> > > > referenced by topology name + component name + instance id.
> > > >
> > > > On Fri, May 4, 2018 at 11:10 PM, Karthik Ramasamy <
> kart...@streaml.io>
> > > > wrote:
> > > >
> > > > > I am not sure I understand why the state is tied to an instance?
> > > > >
> > > > > cheers
> > > > > /karthik
> > > > >
> > > > > On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper <
> > tom.n.coo...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Yeah, state recovery is a bit more difficult with Heron's
> > > architecture.
> > > > > In
> > > > > > Storm, the task IDs are not just values used for routing they
> > > actually
> > > > > > equate to a task instance within the executor. An executor which
> > > > > currently
> > > > > > processes the keys 4-8 actually contains 5 task instances of the
> > same
> > > > > > component. So for each task, they just save its state attached to
> > the
> > > > > > single task ID and reassemble executors with the new task
> > instances.
> > > > > >
> > > > > > We don't want or have to do that with Heron instances but we
> would
> > > need
> > > > > to
> > > > > > have some way to have a state change tied to the task (or routing
> > key
> > > > if
> > > > > we
> > > > > > go to the key range idea). For something like a word count you
> > might
> > > > save
> > > > > > counts using a nested map like: { routing key : {word : count }}.
> > The
> > > > > > routing key could be included in the Tuple instance. However,
> > whether
> > > > > this
> > > > > > pattern would work for more generic state cases I don't know?
> > > > > >
> > > > > > Tom Cooper
> > > > > > W: www.tomcooper.org.uk  | Twitter: @tomncooper
> > > > > > 
> > > > > >
> > > > > >
> > > > > > On Fri, 4 May 2018 at 15:54, Neng Lu  wrote:
> > > > > >
> > > > > > > +1 for this idea. As long as the predefined key space is large
> > > > enough,
> > > > > it
> > > > > > > should work for most of the cases.
> > > > > > >
> > > > > > > Based on my experience with topologies, I never saw one
> component
> > > has
> > > > > > more
> > > > > > > than 1000 instances in a topology.
> > > > > > >
> > > > > > > For recovering states from an update, there will be some
> problems
> > > > > though.
> > > > > > > Since the states stored in heron are strongly connected with
> each
> > > > > > instance,
> > > > > > > we either need to have
> > > > > > > some resolver does the state repartitioning or stores states
> with
> > > the
> > > > > key
> > > > > > > instead of with each instance.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, May 4, 2018 at 3:01 PM, Karthik Ramasamy <
> > > > kramas...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for sharing. I like the Storm approach
> > > > > > > >
> > > > > > > > - keeps the implementation simpler
> > > > > > > > - state is deterministic across restarts
> > > > > > > > - makes it easy to reason and debug
> > > > > > > >
> > > > > > > > The hard limit is not a problem at all since most of the
> > > topologies
> > > > > > will
> > > > > > > > be never that big.
> > > > > > > > If you can handle Twitter topologies cleanly, it is more that
> > > > > > sufficient
> > > > > > > I
> > > > > > > > believe.
> > > > > > > >
> > > > > > > > cheers
> > > > > > > > /karthik
> > > > > > > >
> > > > > > > > > On May 4, 2018, at 2:31 PM, Thomas Cooper <
> > > > tom.n.coo...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > A while ago I emailed about the issue of how fields (key)
> > > grouped
> > > > > > > routing
> > > > > > > > > in Heron was not consistent across an update and how this
> > makes
> > > > > > > > preserving

Re: Stateful updating and deterministic routing

2018-05-05 Thread Bill Graham
The stateful processing design included a large section on scaling, which
was intended to be done as a future phase. It's very similar to what's
being described. Sanjeev and I worked on it about a 1.5 years ago with
Maosong and it was in a google doc. Sanjeev do you have that design doc? I
can't seem locate it.

On Sat, May 5, 2018 at 12:03 AM, Ning Wang  wrote:

> If we go this way, we need key -> state map for each component so that the
> state data can be repartitioned.
>
> On Fri, May 4, 2018 at 11:44 PM, Karthik Ramasamy 
> wrote:
>
> > Instead - if it references
> >
> > topology name + component name + key range
> >
> > will it be better?
> >
> > cheers
> > /karthik
> >
> >
> > On Fri, May 4, 2018 at 11:23 PM, Ning Wang  wrote:
> >
> > > Currently I think each Instance serializes the state object into a byte
> > > array and checkpoint manager saves the byte array into a file. The file
> > is
> > > referenced by topology name + component name + instance id.
> > >
> > > On Fri, May 4, 2018 at 11:10 PM, Karthik Ramasamy 
> > > wrote:
> > >
> > > > I am not sure I understand why the state is tied to an instance?
> > > >
> > > > cheers
> > > > /karthik
> > > >
> > > > On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper <
> tom.n.coo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Yeah, state recovery is a bit more difficult with Heron's
> > architecture.
> > > > In
> > > > > Storm, the task IDs are not just values used for routing they
> > actually
> > > > > equate to a task instance within the executor. An executor which
> > > > currently
> > > > > processes the keys 4-8 actually contains 5 task instances of the
> same
> > > > > component. So for each task, they just save its state attached to
> the
> > > > > single task ID and reassemble executors with the new task
> instances.
> > > > >
> > > > > We don't want or have to do that with Heron instances but we would
> > need
> > > > to
> > > > > have some way to have a state change tied to the task (or routing
> key
> > > if
> > > > we
> > > > > go to the key range idea). For something like a word count you
> might
> > > save
> > > > > counts using a nested map like: { routing key : {word : count }}.
> The
> > > > > routing key could be included in the Tuple instance. However,
> whether
> > > > this
> > > > > pattern would work for more generic state cases I don't know?
> > > > >
> > > > > Tom Cooper
> > > > > W: www.tomcooper.org.uk  | Twitter: @tomncooper
> > > > > 
> > > > >
> > > > >
> > > > > On Fri, 4 May 2018 at 15:54, Neng Lu  wrote:
> > > > >
> > > > > > +1 for this idea. As long as the predefined key space is large
> > > enough,
> > > > it
> > > > > > should work for most of the cases.
> > > > > >
> > > > > > Based on my experience with topologies, I never saw one component
> > has
> > > > > more
> > > > > > than 1000 instances in a topology.
> > > > > >
> > > > > > For recovering states from an update, there will be some problems
> > > > though.
> > > > > > Since the states stored in heron are strongly connected with each
> > > > > instance,
> > > > > > we either need to have
> > > > > > some resolver does the state repartitioning or stores states with
> > the
> > > > key
> > > > > > instead of with each instance.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, May 4, 2018 at 3:01 PM, Karthik Ramasamy <
> > > kramas...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks for sharing. I like the Storm approach
> > > > > > >
> > > > > > > - keeps the implementation simpler
> > > > > > > - state is deterministic across restarts
> > > > > > > - makes it easy to reason and debug
> > > > > > >
> > > > > > > The hard limit is not a problem at all since most of the
> > topologies
> > > > > will
> > > > > > > be never that big.
> > > > > > > If you can handle Twitter topologies cleanly, it is more that
> > > > > sufficient
> > > > > > I
> > > > > > > believe.
> > > > > > >
> > > > > > > cheers
> > > > > > > /karthik
> > > > > > >
> > > > > > > > On May 4, 2018, at 2:31 PM, Thomas Cooper <
> > > tom.n.coo...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > A while ago I emailed about the issue of how fields (key)
> > grouped
> > > > > > routing
> > > > > > > > in Heron was not consistent across an update and how this
> makes
> > > > > > > preserving
> > > > > > > > state across an update very difficult and also makes it
> > > > > > > > difficult/impossible to analyse or predict tuple flows
> through
> > a
> > > > > > > > current/proposed topology physical plan.
> > > > > > > >
> > > > > > > > I suggested adopting Storms approach of pre-defining a
> routing
> > > key
> > > > > > > > space for each component (eg 0-999), so that instead of an
> > > instance
> > > > > > > having
> > > > > > > > a single task id that gets reset at every update (eg 10) it
> > 

Re: Stateful updating and deterministic routing

2018-05-05 Thread Ning Wang
If we go this way, we need key -> state map for each component so that the
state data can be repartitioned.

On Fri, May 4, 2018 at 11:44 PM, Karthik Ramasamy 
wrote:

> Instead - if it references
>
> topology name + component name + key range
>
> will it be better?
>
> cheers
> /karthik
>
>
> On Fri, May 4, 2018 at 11:23 PM, Ning Wang  wrote:
>
> > Currently I think each Instance serializes the state object into a byte
> > array and checkpoint manager saves the byte array into a file. The file
> is
> > referenced by topology name + component name + instance id.
> >
> > On Fri, May 4, 2018 at 11:10 PM, Karthik Ramasamy 
> > wrote:
> >
> > > I am not sure I understand why the state is tied to an instance?
> > >
> > > cheers
> > > /karthik
> > >
> > > On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper 
> > > wrote:
> > >
> > > > Yeah, state recovery is a bit more difficult with Heron's
> architecture.
> > > In
> > > > Storm, the task IDs are not just values used for routing they
> actually
> > > > equate to a task instance within the executor. An executor which
> > > currently
> > > > processes the keys 4-8 actually contains 5 task instances of the same
> > > > component. So for each task, they just save its state attached to the
> > > > single task ID and reassemble executors with the new task instances.
> > > >
> > > > We don't want or have to do that with Heron instances but we would
> need
> > > to
> > > > have some way to have a state change tied to the task (or routing key
> > if
> > > we
> > > > go to the key range idea). For something like a word count you might
> > save
> > > > counts using a nested map like: { routing key : {word : count }}. The
> > > > routing key could be included in the Tuple instance. However, whether
> > > this
> > > > pattern would work for more generic state cases I don't know?
> > > >
> > > > Tom Cooper
> > > > W: www.tomcooper.org.uk  | Twitter: @tomncooper
> > > > 
> > > >
> > > >
> > > > On Fri, 4 May 2018 at 15:54, Neng Lu  wrote:
> > > >
> > > > > +1 for this idea. As long as the predefined key space is large
> > enough,
> > > it
> > > > > should work for most of the cases.
> > > > >
> > > > > Based on my experience with topologies, I never saw one component
> has
> > > > more
> > > > > than 1000 instances in a topology.
> > > > >
> > > > > For recovering states from an update, there will be some problems
> > > though.
> > > > > Since the states stored in heron are strongly connected with each
> > > > instance,
> > > > > we either need to have
> > > > > some resolver does the state repartitioning or stores states with
> the
> > > key
> > > > > instead of with each instance.
> > > > >
> > > > >
> > > > >
> > > > > On Fri, May 4, 2018 at 3:01 PM, Karthik Ramasamy <
> > kramas...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thanks for sharing. I like the Storm approach
> > > > > >
> > > > > > - keeps the implementation simpler
> > > > > > - state is deterministic across restarts
> > > > > > - makes it easy to reason and debug
> > > > > >
> > > > > > The hard limit is not a problem at all since most of the
> topologies
> > > > will
> > > > > > be never that big.
> > > > > > If you can handle Twitter topologies cleanly, it is more that
> > > > sufficient
> > > > > I
> > > > > > believe.
> > > > > >
> > > > > > cheers
> > > > > > /karthik
> > > > > >
> > > > > > > On May 4, 2018, at 2:31 PM, Thomas Cooper <
> > tom.n.coo...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > A while ago I emailed about the issue of how fields (key)
> grouped
> > > > > routing
> > > > > > > in Heron was not consistent across an update and how this makes
> > > > > > preserving
> > > > > > > state across an update very difficult and also makes it
> > > > > > > difficult/impossible to analyse or predict tuple flows through
> a
> > > > > > > current/proposed topology physical plan.
> > > > > > >
> > > > > > > I suggested adopting Storms approach of pre-defining a routing
> > key
> > > > > > > space for each component (eg 0-999), so that instead of an
> > instance
> > > > > > having
> > > > > > > a single task id that gets reset at every update (eg 10) it
> has a
> > > > range
> > > > > > of
> > > > > > > id's (eg 10-16) that changes depending on the parallelism of
> the
> > > > > > component.
> > > > > > > This has the advantage that a key will always hash to the same
> > task
> > > > ID
> > > > > > for
> > > > > > > the lifetime of the topology. Meaning recovering state for an
> > > > instance
> > > > > > > after a crash or update is just a case of pulling the state
> > linked
> > > to
> > > > > the
> > > > > > > keys in its task ID range.
> > > > > > >
> > > > > > > I know the above proposal has issues, not least of all placing
> a
> > > hard
> > > > > > upper
> > > > > > > limit on the scale out of a component, and 

Re: Stateful updating and deterministic routing

2018-05-05 Thread Karthik Ramasamy
Instead - if it references

topology name + component name + key range

will it be better?

cheers
/karthik


On Fri, May 4, 2018 at 11:23 PM, Ning Wang  wrote:

> Currently I think each Instance serializes the state object into a byte
> array and checkpoint manager saves the byte array into a file. The file is
> referenced by topology name + component name + instance id.
>
> On Fri, May 4, 2018 at 11:10 PM, Karthik Ramasamy 
> wrote:
>
> > I am not sure I understand why the state is tied to an instance?
> >
> > cheers
> > /karthik
> >
> > On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper 
> > wrote:
> >
> > > Yeah, state recovery is a bit more difficult with Heron's architecture.
> > In
> > > Storm, the task IDs are not just values used for routing they actually
> > > equate to a task instance within the executor. An executor which
> > currently
> > > processes the keys 4-8 actually contains 5 task instances of the same
> > > component. So for each task, they just save its state attached to the
> > > single task ID and reassemble executors with the new task instances.
> > >
> > > We don't want or have to do that with Heron instances but we would need
> > to
> > > have some way to have a state change tied to the task (or routing key
> if
> > we
> > > go to the key range idea). For something like a word count you might
> save
> > > counts using a nested map like: { routing key : {word : count }}. The
> > > routing key could be included in the Tuple instance. However, whether
> > this
> > > pattern would work for more generic state cases I don't know?
> > >
> > > Tom Cooper
> > > W: www.tomcooper.org.uk  | Twitter: @tomncooper
> > > 
> > >
> > >
> > > On Fri, 4 May 2018 at 15:54, Neng Lu  wrote:
> > >
> > > > +1 for this idea. As long as the predefined key space is large
> enough,
> > it
> > > > should work for most of the cases.
> > > >
> > > > Based on my experience with topologies, I never saw one component has
> > > more
> > > > than 1000 instances in a topology.
> > > >
> > > > For recovering states from an update, there will be some problems
> > though.
> > > > Since the states stored in heron are strongly connected with each
> > > instance,
> > > > we either need to have
> > > > some resolver does the state repartitioning or stores states with the
> > key
> > > > instead of with each instance.
> > > >
> > > >
> > > >
> > > > On Fri, May 4, 2018 at 3:01 PM, Karthik Ramasamy <
> kramas...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks for sharing. I like the Storm approach
> > > > >
> > > > > - keeps the implementation simpler
> > > > > - state is deterministic across restarts
> > > > > - makes it easy to reason and debug
> > > > >
> > > > > The hard limit is not a problem at all since most of the topologies
> > > will
> > > > > be never that big.
> > > > > If you can handle Twitter topologies cleanly, it is more that
> > > sufficient
> > > > I
> > > > > believe.
> > > > >
> > > > > cheers
> > > > > /karthik
> > > > >
> > > > > > On May 4, 2018, at 2:31 PM, Thomas Cooper <
> tom.n.coo...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > A while ago I emailed about the issue of how fields (key) grouped
> > > > routing
> > > > > > in Heron was not consistent across an update and how this makes
> > > > > preserving
> > > > > > state across an update very difficult and also makes it
> > > > > > difficult/impossible to analyse or predict tuple flows through a
> > > > > > current/proposed topology physical plan.
> > > > > >
> > > > > > I suggested adopting Storms approach of pre-defining a routing
> key
> > > > > > space for each component (eg 0-999), so that instead of an
> instance
> > > > > having
> > > > > > a single task id that gets reset at every update (eg 10) it has a
> > > range
> > > > > of
> > > > > > id's (eg 10-16) that changes depending on the parallelism of the
> > > > > component.
> > > > > > This has the advantage that a key will always hash to the same
> task
> > > ID
> > > > > for
> > > > > > the lifetime of the topology. Meaning recovering state for an
> > > instance
> > > > > > after a crash or update is just a case of pulling the state
> linked
> > to
> > > > the
> > > > > > keys in its task ID range.
> > > > > >
> > > > > > I know the above proposal has issues, not least of all placing a
> > hard
> > > > > upper
> > > > > > limit on the scale out of a component, and that some alternative
> > > ideas
> > > > > are
> > > > > > being floated for solving the stateful update issue. However, I
> > just
> > > > > wanted
> > > > > > to throw some more weight behind the Storm approach. There was a
> > > recent
> > > > > > paper about high-performance network load balancing
> > > > > >  > > > > datacenter-load-balancing-with-beamer/>that
> > > > > > describes an approach using a fixed key space 

Re: Stateful updating and deterministic routing

2018-05-05 Thread Ning Wang
Currently I think each Instance serializes the state object into a byte
array and checkpoint manager saves the byte array into a file. The file is
referenced by topology name + component name + instance id.

On Fri, May 4, 2018 at 11:10 PM, Karthik Ramasamy 
wrote:

> I am not sure I understand why the state is tied to an instance?
>
> cheers
> /karthik
>
> On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper 
> wrote:
>
> > Yeah, state recovery is a bit more difficult with Heron's architecture.
> In
> > Storm, the task IDs are not just values used for routing they actually
> > equate to a task instance within the executor. An executor which
> currently
> > processes the keys 4-8 actually contains 5 task instances of the same
> > component. So for each task, they just save its state attached to the
> > single task ID and reassemble executors with the new task instances.
> >
> > We don't want or have to do that with Heron instances but we would need
> to
> > have some way to have a state change tied to the task (or routing key if
> we
> > go to the key range idea). For something like a word count you might save
> > counts using a nested map like: { routing key : {word : count }}. The
> > routing key could be included in the Tuple instance. However, whether
> this
> > pattern would work for more generic state cases I don't know?
> >
> > Tom Cooper
> > W: www.tomcooper.org.uk  | Twitter: @tomncooper
> > 
> >
> >
> > On Fri, 4 May 2018 at 15:54, Neng Lu  wrote:
> >
> > > +1 for this idea. As long as the predefined key space is large enough,
> it
> > > should work for most of the cases.
> > >
> > > Based on my experience with topologies, I never saw one component has
> > more
> > > than 1000 instances in a topology.
> > >
> > > For recovering states from an update, there will be some problems
> though.
> > > Since the states stored in heron are strongly connected with each
> > instance,
> > > we either need to have
> > > some resolver does the state repartitioning or stores states with the
> key
> > > instead of with each instance.
> > >
> > >
> > >
> > > On Fri, May 4, 2018 at 3:01 PM, Karthik Ramasamy 
> > > wrote:
> > >
> > > > Thanks for sharing. I like the Storm approach
> > > >
> > > > - keeps the implementation simpler
> > > > - state is deterministic across restarts
> > > > - makes it easy to reason and debug
> > > >
> > > > The hard limit is not a problem at all since most of the topologies
> > will
> > > > be never that big.
> > > > If you can handle Twitter topologies cleanly, it is more that
> > sufficient
> > > I
> > > > believe.
> > > >
> > > > cheers
> > > > /karthik
> > > >
> > > > > On May 4, 2018, at 2:31 PM, Thomas Cooper 
> > > > wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > A while ago I emailed about the issue of how fields (key) grouped
> > > routing
> > > > > in Heron was not consistent across an update and how this makes
> > > > preserving
> > > > > state across an update very difficult and also makes it
> > > > > difficult/impossible to analyse or predict tuple flows through a
> > > > > current/proposed topology physical plan.
> > > > >
> > > > > I suggested adopting Storms approach of pre-defining a routing key
> > > > > space for each component (eg 0-999), so that instead of an instance
> > > > having
> > > > > a single task id that gets reset at every update (eg 10) it has a
> > range
> > > > of
> > > > > id's (eg 10-16) that changes depending on the parallelism of the
> > > > component.
> > > > > This has the advantage that a key will always hash to the same task
> > ID
> > > > for
> > > > > the lifetime of the topology. Meaning recovering state for an
> > instance
> > > > > after a crash or update is just a case of pulling the state linked
> to
> > > the
> > > > > keys in its task ID range.
> > > > >
> > > > > I know the above proposal has issues, not least of all placing a
> hard
> > > > upper
> > > > > limit on the scale out of a component, and that some alternative
> > ideas
> > > > are
> > > > > being floated for solving the stateful update issue. However, I
> just
> > > > wanted
> > > > > to throw some more weight behind the Storm approach. There was a
> > recent
> > > > > paper about high-performance network load balancing
> > > > >  > > > datacenter-load-balancing-with-beamer/>that
> > > > > describes an approach using a fixed key space similar to Storm's
> (see
> > > the
> > > > > section called Stable Hashing - they assign a range 100x the
> expected
> > > > > connection pool size - which we could do with heron to prevent ever
> > > > hitting
> > > > > the upper scaling limit). Also, this new load balancer, Beamer,
> > claims
> > > to
> > > > > be twice as fast as Google's Maglev
> > > > >  > > > 

Re: Stateful updating and deterministic routing

2018-05-05 Thread Karthik Ramasamy
I am not sure I understand why the state is tied to an instance?

cheers
/karthik

On Fri, May 4, 2018 at 4:36 PM, Thomas Cooper 
wrote:

> Yeah, state recovery is a bit more difficult with Heron's architecture. In
> Storm, the task IDs are not just values used for routing they actually
> equate to a task instance within the executor. An executor which currently
> processes the keys 4-8 actually contains 5 task instances of the same
> component. So for each task, they just save its state attached to the
> single task ID and reassemble executors with the new task instances.
>
> We don't want or have to do that with Heron instances but we would need to
> have some way to have a state change tied to the task (or routing key if we
> go to the key range idea). For something like a word count you might save
> counts using a nested map like: { routing key : {word : count }}. The
> routing key could be included in the Tuple instance. However, whether this
> pattern would work for more generic state cases I don't know?
>
> Tom Cooper
> W: www.tomcooper.org.uk  | Twitter: @tomncooper
> 
>
>
> On Fri, 4 May 2018 at 15:54, Neng Lu  wrote:
>
> > +1 for this idea. As long as the predefined key space is large enough, it
> > should work for most of the cases.
> >
> > Based on my experience with topologies, I never saw one component has
> more
> > than 1000 instances in a topology.
> >
> > For recovering states from an update, there will be some problems though.
> > Since the states stored in heron are strongly connected with each
> instance,
> > we either need to have
> > some resolver does the state repartitioning or stores states with the key
> > instead of with each instance.
> >
> >
> >
> > On Fri, May 4, 2018 at 3:01 PM, Karthik Ramasamy 
> > wrote:
> >
> > > Thanks for sharing. I like the Storm approach
> > >
> > > - keeps the implementation simpler
> > > - state is deterministic across restarts
> > > - makes it easy to reason and debug
> > >
> > > The hard limit is not a problem at all since most of the topologies
> will
> > > be never that big.
> > > If you can handle Twitter topologies cleanly, it is more that
> sufficient
> > I
> > > believe.
> > >
> > > cheers
> > > /karthik
> > >
> > > > On May 4, 2018, at 2:31 PM, Thomas Cooper 
> > > wrote:
> > > >
> > > > Hi all,
> > > >
> > > > A while ago I emailed about the issue of how fields (key) grouped
> > routing
> > > > in Heron was not consistent across an update and how this makes
> > > preserving
> > > > state across an update very difficult and also makes it
> > > > difficult/impossible to analyse or predict tuple flows through a
> > > > current/proposed topology physical plan.
> > > >
> > > > I suggested adopting Storms approach of pre-defining a routing key
> > > > space for each component (eg 0-999), so that instead of an instance
> > > having
> > > > a single task id that gets reset at every update (eg 10) it has a
> range
> > > of
> > > > id's (eg 10-16) that changes depending on the parallelism of the
> > > component.
> > > > This has the advantage that a key will always hash to the same task
> ID
> > > for
> > > > the lifetime of the topology. Meaning recovering state for an
> instance
> > > > after a crash or update is just a case of pulling the state linked to
> > the
> > > > keys in its task ID range.
> > > >
> > > > I know the above proposal has issues, not least of all placing a hard
> > > upper
> > > > limit on the scale out of a component, and that some alternative
> ideas
> > > are
> > > > being floated for solving the stateful update issue. However, I just
> > > wanted
> > > > to throw some more weight behind the Storm approach. There was a
> recent
> > > > paper about high-performance network load balancing
> > > >  > > datacenter-load-balancing-with-beamer/>that
> > > > describes an approach using a fixed key space similar to Storm's (see
> > the
> > > > section called Stable Hashing - they assign a range 100x the expected
> > > > connection pool size - which we could do with heron to prevent ever
> > > hitting
> > > > the upper scaling limit). Also, this new load balancer, Beamer,
> claims
> > to
> > > > be twice as fast as Google's Maglev
> > > >  > > reliable-software-network-load-balancer/>
> > > > which again uses a pre-defined keyspace and ID ranges to create
> look-up
> > > > tables deterministically.
> > > >
> > > > I know a load balancer is a different beast to a stream grouping but
> > > there
> > > > are some interesting ideas in those papers (The links point to
> summary
> > > blog
> > > > posts so you don't have to read the whole paper).
> > > >
> > > > Anyway, I just thought I would those papers out there and see what
> > people
> > > > think.
> > > >
> > > > Tom Cooper
> > > > W: www.tomcooper.org.uk  | 

Re: Stateful updating and deterministic routing

2018-05-04 Thread Thomas Cooper
Yeah, state recovery is a bit more difficult with Heron's architecture. In
Storm, the task IDs are not just values used for routing they actually
equate to a task instance within the executor. An executor which currently
processes the keys 4-8 actually contains 5 task instances of the same
component. So for each task, they just save its state attached to the
single task ID and reassemble executors with the new task instances.

We don't want or have to do that with Heron instances but we would need to
have some way to have a state change tied to the task (or routing key if we
go to the key range idea). For something like a word count you might save
counts using a nested map like: { routing key : {word : count }}. The
routing key could be included in the Tuple instance. However, whether this
pattern would work for more generic state cases I don't know?

Tom Cooper
W: www.tomcooper.org.uk  | Twitter: @tomncooper



On Fri, 4 May 2018 at 15:54, Neng Lu  wrote:

> +1 for this idea. As long as the predefined key space is large enough, it
> should work for most of the cases.
>
> Based on my experience with topologies, I never saw one component has more
> than 1000 instances in a topology.
>
> For recovering states from an update, there will be some problems though.
> Since the states stored in heron are strongly connected with each instance,
> we either need to have
> some resolver does the state repartitioning or stores states with the key
> instead of with each instance.
>
>
>
> On Fri, May 4, 2018 at 3:01 PM, Karthik Ramasamy 
> wrote:
>
> > Thanks for sharing. I like the Storm approach
> >
> > - keeps the implementation simpler
> > - state is deterministic across restarts
> > - makes it easy to reason and debug
> >
> > The hard limit is not a problem at all since most of the topologies will
> > be never that big.
> > If you can handle Twitter topologies cleanly, it is more that sufficient
> I
> > believe.
> >
> > cheers
> > /karthik
> >
> > > On May 4, 2018, at 2:31 PM, Thomas Cooper 
> > wrote:
> > >
> > > Hi all,
> > >
> > > A while ago I emailed about the issue of how fields (key) grouped
> routing
> > > in Heron was not consistent across an update and how this makes
> > preserving
> > > state across an update very difficult and also makes it
> > > difficult/impossible to analyse or predict tuple flows through a
> > > current/proposed topology physical plan.
> > >
> > > I suggested adopting Storms approach of pre-defining a routing key
> > > space for each component (eg 0-999), so that instead of an instance
> > having
> > > a single task id that gets reset at every update (eg 10) it has a range
> > of
> > > id's (eg 10-16) that changes depending on the parallelism of the
> > component.
> > > This has the advantage that a key will always hash to the same task ID
> > for
> > > the lifetime of the topology. Meaning recovering state for an instance
> > > after a crash or update is just a case of pulling the state linked to
> the
> > > keys in its task ID range.
> > >
> > > I know the above proposal has issues, not least of all placing a hard
> > upper
> > > limit on the scale out of a component, and that some alternative ideas
> > are
> > > being floated for solving the stateful update issue. However, I just
> > wanted
> > > to throw some more weight behind the Storm approach. There was a recent
> > > paper about high-performance network load balancing
> > >  > datacenter-load-balancing-with-beamer/>that
> > > describes an approach using a fixed key space similar to Storm's (see
> the
> > > section called Stable Hashing - they assign a range 100x the expected
> > > connection pool size - which we could do with heron to prevent ever
> > hitting
> > > the upper scaling limit). Also, this new load balancer, Beamer, claims
> to
> > > be twice as fast as Google's Maglev
> > >  > reliable-software-network-load-balancer/>
> > > which again uses a pre-defined keyspace and ID ranges to create look-up
> > > tables deterministically.
> > >
> > > I know a load balancer is a different beast to a stream grouping but
> > there
> > > are some interesting ideas in those papers (The links point to summary
> > blog
> > > posts so you don't have to read the whole paper).
> > >
> > > Anyway, I just thought I would those papers out there and see what
> people
> > > think.
> > >
> > > Tom Cooper
> > > W: www.tomcooper.org.uk  | Twitter: @tomncooper
> > > 
> >
> >
>


Re: Stateful updating and deterministic routing

2018-05-04 Thread Neng Lu
+1 for this idea. As long as the predefined key space is large enough, it
should work for most of the cases.

Based on my experience with topologies, I never saw one component has more
than 1000 instances in a topology.

For recovering states from an update, there will be some problems though.
Since the states stored in heron are strongly connected with each instance,
we either need to have
some resolver does the state repartitioning or stores states with the key
instead of with each instance.



On Fri, May 4, 2018 at 3:01 PM, Karthik Ramasamy 
wrote:

> Thanks for sharing. I like the Storm approach
>
> - keeps the implementation simpler
> - state is deterministic across restarts
> - makes it easy to reason and debug
>
> The hard limit is not a problem at all since most of the topologies will
> be never that big.
> If you can handle Twitter topologies cleanly, it is more that sufficient I
> believe.
>
> cheers
> /karthik
>
> > On May 4, 2018, at 2:31 PM, Thomas Cooper 
> wrote:
> >
> > Hi all,
> >
> > A while ago I emailed about the issue of how fields (key) grouped routing
> > in Heron was not consistent across an update and how this makes
> preserving
> > state across an update very difficult and also makes it
> > difficult/impossible to analyse or predict tuple flows through a
> > current/proposed topology physical plan.
> >
> > I suggested adopting Storms approach of pre-defining a routing key
> > space for each component (eg 0-999), so that instead of an instance
> having
> > a single task id that gets reset at every update (eg 10) it has a range
> of
> > id's (eg 10-16) that changes depending on the parallelism of the
> component.
> > This has the advantage that a key will always hash to the same task ID
> for
> > the lifetime of the topology. Meaning recovering state for an instance
> > after a crash or update is just a case of pulling the state linked to the
> > keys in its task ID range.
> >
> > I know the above proposal has issues, not least of all placing a hard
> upper
> > limit on the scale out of a component, and that some alternative ideas
> are
> > being floated for solving the stateful update issue. However, I just
> wanted
> > to throw some more weight behind the Storm approach. There was a recent
> > paper about high-performance network load balancing
> >  datacenter-load-balancing-with-beamer/>that
> > describes an approach using a fixed key space similar to Storm's (see the
> > section called Stable Hashing - they assign a range 100x the expected
> > connection pool size - which we could do with heron to prevent ever
> hitting
> > the upper scaling limit). Also, this new load balancer, Beamer, claims to
> > be twice as fast as Google's Maglev
> >  reliable-software-network-load-balancer/>
> > which again uses a pre-defined keyspace and ID ranges to create look-up
> > tables deterministically.
> >
> > I know a load balancer is a different beast to a stream grouping but
> there
> > are some interesting ideas in those papers (The links point to summary
> blog
> > posts so you don't have to read the whole paper).
> >
> > Anyway, I just thought I would those papers out there and see what people
> > think.
> >
> > Tom Cooper
> > W: www.tomcooper.org.uk  | Twitter: @tomncooper
> > 
>
>


Re: Stateful updating and deterministic routing

2018-05-04 Thread Ning Wang
Interesting. Thanks for sharing~

On Fri, May 4, 2018 at 2:31 PM, Thomas Cooper 
wrote:

> Hi all,
>
> A while ago I emailed about the issue of how fields (key) grouped routing
> in Heron was not consistent across an update and how this makes preserving
> state across an update very difficult and also makes it
> difficult/impossible to analyse or predict tuple flows through a
> current/proposed topology physical plan.
>
> I suggested adopting Storms approach of pre-defining a routing key
> space for each component (eg 0-999), so that instead of an instance having
> a single task id that gets reset at every update (eg 10) it has a range of
> id's (eg 10-16) that changes depending on the parallelism of the component.
> This has the advantage that a key will always hash to the same task ID for
> the lifetime of the topology. Meaning recovering state for an instance
> after a crash or update is just a case of pulling the state linked to the
> keys in its task ID range.
>
> I know the above proposal has issues, not least of all placing a hard upper
> limit on the scale out of a component, and that some alternative ideas are
> being floated for solving the stateful update issue. However, I just wanted
> to throw some more weight behind the Storm approach. There was a recent
> paper about high-performance network load balancing
>  with-beamer/>that
> describes an approach using a fixed key space similar to Storm's (see the
> section called Stable Hashing - they assign a range 100x the expected
> connection pool size - which we could do with heron to prevent ever hitting
> the upper scaling limit). Also, this new load balancer, Beamer, claims to
> be twice as fast as Google's Maglev
>  reliable-software-network-load-balancer/>
> which again uses a pre-defined keyspace and ID ranges to create look-up
> tables deterministically.
>
> I know a load balancer is a different beast to a stream grouping but there
> are some interesting ideas in those papers (The links point to summary blog
> posts so you don't have to read the whole paper).
>
> Anyway, I just thought I would those papers out there and see what people
> think.
>
> Tom Cooper
> W: www.tomcooper.org.uk  | Twitter: @tomncooper
> 
>


Stateful updating and deterministic routing

2018-05-04 Thread Thomas Cooper
Hi all,

A while ago I emailed about the issue of how fields (key) grouped routing
in Heron was not consistent across an update and how this makes preserving
state across an update very difficult and also makes it
difficult/impossible to analyse or predict tuple flows through a
current/proposed topology physical plan.

I suggested adopting Storms approach of pre-defining a routing key
space for each component (eg 0-999), so that instead of an instance having
a single task id that gets reset at every update (eg 10) it has a range of
id's (eg 10-16) that changes depending on the parallelism of the component.
This has the advantage that a key will always hash to the same task ID for
the lifetime of the topology. Meaning recovering state for an instance
after a crash or update is just a case of pulling the state linked to the
keys in its task ID range.

I know the above proposal has issues, not least of all placing a hard upper
limit on the scale out of a component, and that some alternative ideas are
being floated for solving the stateful update issue. However, I just wanted
to throw some more weight behind the Storm approach. There was a recent
paper about high-performance network load balancing
that
describes an approach using a fixed key space similar to Storm's (see the
section called Stable Hashing - they assign a range 100x the expected
connection pool size - which we could do with heron to prevent ever hitting
the upper scaling limit). Also, this new load balancer, Beamer, claims to
be twice as fast as Google's Maglev

which again uses a pre-defined keyspace and ID ranges to create look-up
tables deterministically.

I know a load balancer is a different beast to a stream grouping but there
are some interesting ideas in those papers (The links point to summary blog
posts so you don't have to read the whole paper).

Anyway, I just thought I would those papers out there and see what people
think.

Tom Cooper
W: www.tomcooper.org.uk  | Twitter: @tomncooper