Re: Statestore restoration & scaling questions - possible KIP as well.

2019-02-25 Thread Guozhang Wang
1) you are right, a thread's restoration phase will not interfere will any
other threads' normal processing collocated within the same JVM / machine
etc at all. So you may have a Streams instance which contains some threads
already finished restoring and started processing tasks, while other
threads contained are still restoring.


Guozhang

On Mon, Feb 25, 2019 at 1:53 PM Adam Bellemare 
wrote:

> Hi Guozhang -
>
> Thanks for the replies, and directing me to the existing JIRAs. I think
> that a two-phase rebalance will be quite useful.
>
> 1) For clarity's sake, I should have just asked: When a new thread / node
> is created and tasks are rebalanced, are the state stores on the new
> threads/nodes fully restored during rebalancing, thereby blocking *any and
> all *threads from proceeding with processing until restoration is complete?
> I do not believe that this is the case, and in the case of rebalanced tasks
> only the threads assigned the new tasks will be paused until state store
> restoration is complete.
>
>
> Thanks for your help - I appreciate you taking the time to reply.
>
> Adam
>
>
>
> On Wed, Feb 20, 2019 at 8:38 PM Guozhang Wang  wrote:
>
> > Hello Adam,
> >
> > Sorry for being late replying on this thread, I've put my comments
> inlined
> > below.
> >
> > On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare 
> > wrote:
> >
> > > Hey Folks
> > >
> > > I have a few questions around the operations of stateful processing
> while
> > > scaling nodes up/down, and a possible KIP in question #4. Most of them
> > have
> > > to do with task processing during rebuilding of state stores after
> > scaling
> > > nodes up.
> > >
> > > Scenario:
> > > Single node/thread, processing 2 topics (10 partitions each):
> > > User event topic (events) - ie: key:userId, value: ProductId
> > > Product topic (entity) - ie: key: ProductId, value: productData
> > >
> > > My topology looks like this:
> > >
> > > KTable productTable = ... //materialize from product topic
> > >
> > > KStream output = userStream
> > > .map(x => (x.value, x.key) ) //Swap the key and value around
> > > .join(productTable, ... ) //Joiner is not relevant here
> > > .to(...)  //Send it to some output topic
> > >
> > >
> > > Here are my questions:
> > > 1) If I scale the processing node count up, partitions will be
> rebalanced
> > > to the new node. Does processing continue as normal on the original
> node,
> > > while the new node's processing is paused as the internal state stores
> > are
> > > rebuilt/reloaded? From my reading of the code (and own experience) I
> > > believe this to be the case, but I am just curious in case I missed
> > > something.
> > >
> > >
> > With 2 topics and 10 partitions each, assuming the default
> PartitionGrouper
> > is used, there should be a total of 20 tasks (10 tasks for map which will
> > send to an internal repartition topic, and 10 tasks for doing the join)
> > created since these two topics are co-partitioned for joins.
> >
> > For example, task-0 would be processing the join from
> > user-topic-partition-0 and product-topic-partition-0, and so on.
> >
> > With a single thread, all of these 20 tasks will be allocated to this
> > thread, which would process them in an iterative manner. Note that since
> > each task has its own state store (e.g. product-state-store-0 for task-0,
> > etc), it means this thread will host all the 10 sets of state stores as
> > well (note for the 10 mapping tasks there's no state stores at all).
> >
> > When you add new threads either within the same node, or on a different
> > node, after rebalance each thread should be processing 10 tasks, and
> hence
> > owning corresponding set of state stores due to rebalance. The new thread
> > will first restore the state stores it gets assigned before start
> > processing.
> >
> >
> > > 2) What happens to the userStream map task? Will the new node be able
> to
> > > process this task while the state store is rebuilding/reloading? My
> > reading
> > > of the code suggests that this map process will be paused on the new
> node
> > > while the state store is rebuilt. The effect of this is that it will
> lead
> > > to a delay in events reaching the original node's partitions, which
> will
> > be
> > > seen as late-arriving events. Am I right in this assessment?
> > >
> > >
> > Currently the thread will NOT start processing any tasks until ALL
> stateful
> > tasks completes restoring (stateless tasks, like the map tasks in your
> > example never needs restoration at all). There's an open JIRA for making
> it
> > customizable but I cannot find it currently.
> >
> >
> > > 3) How does scaling up work with standby state-store replicas? From my
> > > reading of the code, it appears that scaling a node up will result in a
> > > reabalance, with the state assigned to the new node being rebuilt first
> > > (leading to a pause in processing). Following this, the standy replicas
> > are
> > > populated. Am I correct in this reading?
> > >
> > > 

Re: Statestore restoration & scaling questions - possible KIP as well.

2019-02-25 Thread Adam Bellemare
Hi Guozhang -

Thanks for the replies, and directing me to the existing JIRAs. I think
that a two-phase rebalance will be quite useful.

1) For clarity's sake, I should have just asked: When a new thread / node
is created and tasks are rebalanced, are the state stores on the new
threads/nodes fully restored during rebalancing, thereby blocking *any and
all *threads from proceeding with processing until restoration is complete?
I do not believe that this is the case, and in the case of rebalanced tasks
only the threads assigned the new tasks will be paused until state store
restoration is complete.


Thanks for your help - I appreciate you taking the time to reply.

Adam



On Wed, Feb 20, 2019 at 8:38 PM Guozhang Wang  wrote:

> Hello Adam,
>
> Sorry for being late replying on this thread, I've put my comments inlined
> below.
>
> On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare 
> wrote:
>
> > Hey Folks
> >
> > I have a few questions around the operations of stateful processing while
> > scaling nodes up/down, and a possible KIP in question #4. Most of them
> have
> > to do with task processing during rebuilding of state stores after
> scaling
> > nodes up.
> >
> > Scenario:
> > Single node/thread, processing 2 topics (10 partitions each):
> > User event topic (events) - ie: key:userId, value: ProductId
> > Product topic (entity) - ie: key: ProductId, value: productData
> >
> > My topology looks like this:
> >
> > KTable productTable = ... //materialize from product topic
> >
> > KStream output = userStream
> > .map(x => (x.value, x.key) ) //Swap the key and value around
> > .join(productTable, ... ) //Joiner is not relevant here
> > .to(...)  //Send it to some output topic
> >
> >
> > Here are my questions:
> > 1) If I scale the processing node count up, partitions will be rebalanced
> > to the new node. Does processing continue as normal on the original node,
> > while the new node's processing is paused as the internal state stores
> are
> > rebuilt/reloaded? From my reading of the code (and own experience) I
> > believe this to be the case, but I am just curious in case I missed
> > something.
> >
> >
> With 2 topics and 10 partitions each, assuming the default PartitionGrouper
> is used, there should be a total of 20 tasks (10 tasks for map which will
> send to an internal repartition topic, and 10 tasks for doing the join)
> created since these two topics are co-partitioned for joins.
>
> For example, task-0 would be processing the join from
> user-topic-partition-0 and product-topic-partition-0, and so on.
>
> With a single thread, all of these 20 tasks will be allocated to this
> thread, which would process them in an iterative manner. Note that since
> each task has its own state store (e.g. product-state-store-0 for task-0,
> etc), it means this thread will host all the 10 sets of state stores as
> well (note for the 10 mapping tasks there's no state stores at all).
>
> When you add new threads either within the same node, or on a different
> node, after rebalance each thread should be processing 10 tasks, and hence
> owning corresponding set of state stores due to rebalance. The new thread
> will first restore the state stores it gets assigned before start
> processing.
>
>
> > 2) What happens to the userStream map task? Will the new node be able to
> > process this task while the state store is rebuilding/reloading? My
> reading
> > of the code suggests that this map process will be paused on the new node
> > while the state store is rebuilt. The effect of this is that it will lead
> > to a delay in events reaching the original node's partitions, which will
> be
> > seen as late-arriving events. Am I right in this assessment?
> >
> >
> Currently the thread will NOT start processing any tasks until ALL stateful
> tasks completes restoring (stateless tasks, like the map tasks in your
> example never needs restoration at all). There's an open JIRA for making it
> customizable but I cannot find it currently.
>
>
> > 3) How does scaling up work with standby state-store replicas? From my
> > reading of the code, it appears that scaling a node up will result in a
> > reabalance, with the state assigned to the new node being rebuilt first
> > (leading to a pause in processing). Following this, the standy replicas
> are
> > populated. Am I correct in this reading?
> >
> > Standby tasks are running in parallel with active stream tasks, and it
> simply reads from the changelog topic in read time and populate the standby
> store replica; when scaling out, the instances with standby tasks will be
> preferred over those who do not have any standby for the task, and hence
> when restoring only a very small amount of data needs to be restored
> (think: the standby replica of the store is already populated up to offset
> 90 at the rebalance, while the active task is writing to the changelog
> topic with log end offset 100, so you only need to restore 90 - 100 instead
> of 0 - 100).
>
>
> > 4) If my re

Re: Statestore restoration & scaling questions - possible KIP as well.

2019-02-20 Thread Guozhang Wang
Hello Adam,

Sorry for being late replying on this thread, I've put my comments inlined
below.

On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare 
wrote:

> Hey Folks
>
> I have a few questions around the operations of stateful processing while
> scaling nodes up/down, and a possible KIP in question #4. Most of them have
> to do with task processing during rebuilding of state stores after scaling
> nodes up.
>
> Scenario:
> Single node/thread, processing 2 topics (10 partitions each):
> User event topic (events) - ie: key:userId, value: ProductId
> Product topic (entity) - ie: key: ProductId, value: productData
>
> My topology looks like this:
>
> KTable productTable = ... //materialize from product topic
>
> KStream output = userStream
> .map(x => (x.value, x.key) ) //Swap the key and value around
> .join(productTable, ... ) //Joiner is not relevant here
> .to(...)  //Send it to some output topic
>
>
> Here are my questions:
> 1) If I scale the processing node count up, partitions will be rebalanced
> to the new node. Does processing continue as normal on the original node,
> while the new node's processing is paused as the internal state stores are
> rebuilt/reloaded? From my reading of the code (and own experience) I
> believe this to be the case, but I am just curious in case I missed
> something.
>
>
With 2 topics and 10 partitions each, assuming the default PartitionGrouper
is used, there should be a total of 20 tasks (10 tasks for map which will
send to an internal repartition topic, and 10 tasks for doing the join)
created since these two topics are co-partitioned for joins.

For example, task-0 would be processing the join from
user-topic-partition-0 and product-topic-partition-0, and so on.

With a single thread, all of these 20 tasks will be allocated to this
thread, which would process them in an iterative manner. Note that since
each task has its own state store (e.g. product-state-store-0 for task-0,
etc), it means this thread will host all the 10 sets of state stores as
well (note for the 10 mapping tasks there's no state stores at all).

When you add new threads either within the same node, or on a different
node, after rebalance each thread should be processing 10 tasks, and hence
owning corresponding set of state stores due to rebalance. The new thread
will first restore the state stores it gets assigned before start
processing.


> 2) What happens to the userStream map task? Will the new node be able to
> process this task while the state store is rebuilding/reloading? My reading
> of the code suggests that this map process will be paused on the new node
> while the state store is rebuilt. The effect of this is that it will lead
> to a delay in events reaching the original node's partitions, which will be
> seen as late-arriving events. Am I right in this assessment?
>
>
Currently the thread will NOT start processing any tasks until ALL stateful
tasks completes restoring (stateless tasks, like the map tasks in your
example never needs restoration at all). There's an open JIRA for making it
customizable but I cannot find it currently.


> 3) How does scaling up work with standby state-store replicas? From my
> reading of the code, it appears that scaling a node up will result in a
> reabalance, with the state assigned to the new node being rebuilt first
> (leading to a pause in processing). Following this, the standy replicas are
> populated. Am I correct in this reading?
>
> Standby tasks are running in parallel with active stream tasks, and it
simply reads from the changelog topic in read time and populate the standby
store replica; when scaling out, the instances with standby tasks will be
preferred over those who do not have any standby for the task, and hence
when restoring only a very small amount of data needs to be restored
(think: the standby replica of the store is already populated up to offset
90 at the rebalance, while the active task is writing to the changelog
topic with log end offset 100, so you only need to restore 90 - 100 instead
of 0 - 100).


> 4) If my reading in #3 is correct, would it be possible to pre-populate the
> standby stores on scale-up before initiating active-task transfer? This
> would allow seamless scale-up and scale-down without requiring any pauses
> for rebuilding state. I am interested in kicking this off as a KIP if so,
> but would appreciate any JIRAs or related KIPs to read up on prior to
> digging into this.
>
> Yes, there is some discussions about this here:
https://issues.apache.org/jira/browse/KAFKA-6145


>
> Thanks
>
> Adam Bellemare
>


-- 
-- Guozhang


Re: Statestore restoration & scaling questions - possible KIP as well.

2019-02-07 Thread McCaig, Rhys
Adam,

I don’t have an answer for you but I would also be interested in clarification 
of this process if anyone can provide more details. If your reading is correct 
I would welcome the KIP to reduce the scaling pauses.

Cheers,
Rhys McCaig

> On Feb 6, 2019, at 7:44 AM, Adam Bellemare  wrote:
> 
> Bump - hoping someone has some insight. Alternately, redirection to a more
> suitable forum.
> 
> Thanks
> 
> On Sun, Feb 3, 2019 at 10:25 AM Adam Bellemare 
> wrote:
> 
>> Hey Folks
>> 
>> I have a few questions around the operations of stateful processing while
>> scaling nodes up/down, and a possible KIP in question #4. Most of them have
>> to do with task processing during rebuilding of state stores after scaling
>> nodes up.
>> 
>> Scenario:
>> Single node/thread, processing 2 topics (10 partitions each):
>> User event topic (events) - ie: key:userId, value: ProductId
>> Product topic (entity) - ie: key: ProductId, value: productData
>> 
>> My topology looks like this:
>> 
>> KTable productTable = ... //materialize from product topic
>> 
>> KStream output = userStream
>>.map(x => (x.value, x.key) ) //Swap the key and value around
>>.join(productTable, ... ) //Joiner is not relevant here
>>.to(...)  //Send it to some output topic
>> 
>> 
>> Here are my questions:
>> 1) If I scale the processing node count up, partitions will be rebalanced
>> to the new node. Does processing continue as normal on the original node,
>> while the new node's processing is paused as the internal state stores are
>> rebuilt/reloaded? From my reading of the code (and own experience) I
>> believe this to be the case, but I am just curious in case I missed
>> something.
>> 
>> 2) What happens to the userStream map task? Will the new node be able to
>> process this task while the state store is rebuilding/reloading? My reading
>> of the code suggests that this map process will be paused on the new node
>> while the state store is rebuilt. The effect of this is that it will lead
>> to a delay in events reaching the original node's partitions, which will be
>> seen as late-arriving events. Am I right in this assessment?
>> 
>> 3) How does scaling up work with standby state-store replicas? From my
>> reading of the code, it appears that scaling a node up will result in a
>> reabalance, with the state assigned to the new node being rebuilt first
>> (leading to a pause in processing). Following this, the standy replicas are
>> populated. Am I correct in this reading?
>> 
>> 4) If my reading in #3 is correct, would it be possible to pre-populate
>> the standby stores on scale-up before initiating active-task transfer? This
>> would allow seamless scale-up and scale-down without requiring any pauses
>> for rebuilding state. I am interested in kicking this off as a KIP if so,
>> but would appreciate any JIRAs or related KIPs to read up on prior to
>> digging into this.
>> 
>> 
>> Thanks
>> 
>> Adam Bellemare
>> 



Re: Statestore restoration & scaling questions - possible KIP as well.

2019-02-06 Thread Adam Bellemare
Bump - hoping someone has some insight. Alternately, redirection to a more
suitable forum.

Thanks

On Sun, Feb 3, 2019 at 10:25 AM Adam Bellemare 
wrote:

> Hey Folks
>
> I have a few questions around the operations of stateful processing while
> scaling nodes up/down, and a possible KIP in question #4. Most of them have
> to do with task processing during rebuilding of state stores after scaling
> nodes up.
>
> Scenario:
> Single node/thread, processing 2 topics (10 partitions each):
> User event topic (events) - ie: key:userId, value: ProductId
> Product topic (entity) - ie: key: ProductId, value: productData
>
> My topology looks like this:
>
> KTable productTable = ... //materialize from product topic
>
> KStream output = userStream
> .map(x => (x.value, x.key) ) //Swap the key and value around
> .join(productTable, ... ) //Joiner is not relevant here
> .to(...)  //Send it to some output topic
>
>
> Here are my questions:
> 1) If I scale the processing node count up, partitions will be rebalanced
> to the new node. Does processing continue as normal on the original node,
> while the new node's processing is paused as the internal state stores are
> rebuilt/reloaded? From my reading of the code (and own experience) I
> believe this to be the case, but I am just curious in case I missed
> something.
>
> 2) What happens to the userStream map task? Will the new node be able to
> process this task while the state store is rebuilding/reloading? My reading
> of the code suggests that this map process will be paused on the new node
> while the state store is rebuilt. The effect of this is that it will lead
> to a delay in events reaching the original node's partitions, which will be
> seen as late-arriving events. Am I right in this assessment?
>
> 3) How does scaling up work with standby state-store replicas? From my
> reading of the code, it appears that scaling a node up will result in a
> reabalance, with the state assigned to the new node being rebuilt first
> (leading to a pause in processing). Following this, the standy replicas are
> populated. Am I correct in this reading?
>
> 4) If my reading in #3 is correct, would it be possible to pre-populate
> the standby stores on scale-up before initiating active-task transfer? This
> would allow seamless scale-up and scale-down without requiring any pauses
> for rebuilding state. I am interested in kicking this off as a KIP if so,
> but would appreciate any JIRAs or related KIPs to read up on prior to
> digging into this.
>
>
> Thanks
>
> Adam Bellemare
>


Statestore restoration & scaling questions - possible KIP as well.

2019-02-03 Thread Adam Bellemare
Hey Folks

I have a few questions around the operations of stateful processing while
scaling nodes up/down, and a possible KIP in question #4. Most of them have
to do with task processing during rebuilding of state stores after scaling
nodes up.

Scenario:
Single node/thread, processing 2 topics (10 partitions each):
User event topic (events) - ie: key:userId, value: ProductId
Product topic (entity) - ie: key: ProductId, value: productData

My topology looks like this:

KTable productTable = ... //materialize from product topic

KStream output = userStream
.map(x => (x.value, x.key) ) //Swap the key and value around
.join(productTable, ... ) //Joiner is not relevant here
.to(...)  //Send it to some output topic


Here are my questions:
1) If I scale the processing node count up, partitions will be rebalanced
to the new node. Does processing continue as normal on the original node,
while the new node's processing is paused as the internal state stores are
rebuilt/reloaded? From my reading of the code (and own experience) I
believe this to be the case, but I am just curious in case I missed
something.

2) What happens to the userStream map task? Will the new node be able to
process this task while the state store is rebuilding/reloading? My reading
of the code suggests that this map process will be paused on the new node
while the state store is rebuilt. The effect of this is that it will lead
to a delay in events reaching the original node's partitions, which will be
seen as late-arriving events. Am I right in this assessment?

3) How does scaling up work with standby state-store replicas? From my
reading of the code, it appears that scaling a node up will result in a
reabalance, with the state assigned to the new node being rebuilt first
(leading to a pause in processing). Following this, the standy replicas are
populated. Am I correct in this reading?

4) If my reading in #3 is correct, would it be possible to pre-populate the
standby stores on scale-up before initiating active-task transfer? This
would allow seamless scale-up and scale-down without requiring any pauses
for rebuilding state. I am interested in kicking this off as a KIP if so,
but would appreciate any JIRAs or related KIPs to read up on prior to
digging into this.


Thanks

Adam Bellemare