Re: Statestore restoration & scaling questions - possible KIP as well.
1) you are right, a thread's restoration phase will not interfere will any other threads' normal processing collocated within the same JVM / machine etc at all. So you may have a Streams instance which contains some threads already finished restoring and started processing tasks, while other threads contained are still restoring. Guozhang On Mon, Feb 25, 2019 at 1:53 PM Adam Bellemare wrote: > Hi Guozhang - > > Thanks for the replies, and directing me to the existing JIRAs. I think > that a two-phase rebalance will be quite useful. > > 1) For clarity's sake, I should have just asked: When a new thread / node > is created and tasks are rebalanced, are the state stores on the new > threads/nodes fully restored during rebalancing, thereby blocking *any and > all *threads from proceeding with processing until restoration is complete? > I do not believe that this is the case, and in the case of rebalanced tasks > only the threads assigned the new tasks will be paused until state store > restoration is complete. > > > Thanks for your help - I appreciate you taking the time to reply. > > Adam > > > > On Wed, Feb 20, 2019 at 8:38 PM Guozhang Wang wrote: > > > Hello Adam, > > > > Sorry for being late replying on this thread, I've put my comments > inlined > > below. > > > > On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare > > wrote: > > > > > Hey Folks > > > > > > I have a few questions around the operations of stateful processing > while > > > scaling nodes up/down, and a possible KIP in question #4. Most of them > > have > > > to do with task processing during rebuilding of state stores after > > scaling > > > nodes up. > > > > > > Scenario: > > > Single node/thread, processing 2 topics (10 partitions each): > > > User event topic (events) - ie: key:userId, value: ProductId > > > Product topic (entity) - ie: key: ProductId, value: productData > > > > > > My topology looks like this: > > > > > > KTable productTable = ... //materialize from product topic > > > > > > KStream output = userStream > > > .map(x => (x.value, x.key) ) //Swap the key and value around > > > .join(productTable, ... ) //Joiner is not relevant here > > > .to(...) //Send it to some output topic > > > > > > > > > Here are my questions: > > > 1) If I scale the processing node count up, partitions will be > rebalanced > > > to the new node. Does processing continue as normal on the original > node, > > > while the new node's processing is paused as the internal state stores > > are > > > rebuilt/reloaded? From my reading of the code (and own experience) I > > > believe this to be the case, but I am just curious in case I missed > > > something. > > > > > > > > With 2 topics and 10 partitions each, assuming the default > PartitionGrouper > > is used, there should be a total of 20 tasks (10 tasks for map which will > > send to an internal repartition topic, and 10 tasks for doing the join) > > created since these two topics are co-partitioned for joins. > > > > For example, task-0 would be processing the join from > > user-topic-partition-0 and product-topic-partition-0, and so on. > > > > With a single thread, all of these 20 tasks will be allocated to this > > thread, which would process them in an iterative manner. Note that since > > each task has its own state store (e.g. product-state-store-0 for task-0, > > etc), it means this thread will host all the 10 sets of state stores as > > well (note for the 10 mapping tasks there's no state stores at all). > > > > When you add new threads either within the same node, or on a different > > node, after rebalance each thread should be processing 10 tasks, and > hence > > owning corresponding set of state stores due to rebalance. The new thread > > will first restore the state stores it gets assigned before start > > processing. > > > > > > > 2) What happens to the userStream map task? Will the new node be able > to > > > process this task while the state store is rebuilding/reloading? My > > reading > > > of the code suggests that this map process will be paused on the new > node > > > while the state store is rebuilt. The effect of this is that it will > lead > > > to a delay in events reaching the original node's partitions, which > will > > be > > > seen as late-arriving events. Am I right in this assessment? > > > > > > > > Currently the thread will NOT start processing any tasks until ALL > stateful > > tasks completes restoring (stateless tasks, like the map tasks in your > > example never needs restoration at all). There's an open JIRA for making > it > > customizable but I cannot find it currently. > > > > > > > 3) How does scaling up work with standby state-store replicas? From my > > > reading of the code, it appears that scaling a node up will result in a > > > reabalance, with the state assigned to the new node being rebuilt first > > > (leading to a pause in processing). Following this, the standy replicas > > are > > > populated. Am I correct in this reading? > > > > > >
Re: Statestore restoration & scaling questions - possible KIP as well.
Hi Guozhang - Thanks for the replies, and directing me to the existing JIRAs. I think that a two-phase rebalance will be quite useful. 1) For clarity's sake, I should have just asked: When a new thread / node is created and tasks are rebalanced, are the state stores on the new threads/nodes fully restored during rebalancing, thereby blocking *any and all *threads from proceeding with processing until restoration is complete? I do not believe that this is the case, and in the case of rebalanced tasks only the threads assigned the new tasks will be paused until state store restoration is complete. Thanks for your help - I appreciate you taking the time to reply. Adam On Wed, Feb 20, 2019 at 8:38 PM Guozhang Wang wrote: > Hello Adam, > > Sorry for being late replying on this thread, I've put my comments inlined > below. > > On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare > wrote: > > > Hey Folks > > > > I have a few questions around the operations of stateful processing while > > scaling nodes up/down, and a possible KIP in question #4. Most of them > have > > to do with task processing during rebuilding of state stores after > scaling > > nodes up. > > > > Scenario: > > Single node/thread, processing 2 topics (10 partitions each): > > User event topic (events) - ie: key:userId, value: ProductId > > Product topic (entity) - ie: key: ProductId, value: productData > > > > My topology looks like this: > > > > KTable productTable = ... //materialize from product topic > > > > KStream output = userStream > > .map(x => (x.value, x.key) ) //Swap the key and value around > > .join(productTable, ... ) //Joiner is not relevant here > > .to(...) //Send it to some output topic > > > > > > Here are my questions: > > 1) If I scale the processing node count up, partitions will be rebalanced > > to the new node. Does processing continue as normal on the original node, > > while the new node's processing is paused as the internal state stores > are > > rebuilt/reloaded? From my reading of the code (and own experience) I > > believe this to be the case, but I am just curious in case I missed > > something. > > > > > With 2 topics and 10 partitions each, assuming the default PartitionGrouper > is used, there should be a total of 20 tasks (10 tasks for map which will > send to an internal repartition topic, and 10 tasks for doing the join) > created since these two topics are co-partitioned for joins. > > For example, task-0 would be processing the join from > user-topic-partition-0 and product-topic-partition-0, and so on. > > With a single thread, all of these 20 tasks will be allocated to this > thread, which would process them in an iterative manner. Note that since > each task has its own state store (e.g. product-state-store-0 for task-0, > etc), it means this thread will host all the 10 sets of state stores as > well (note for the 10 mapping tasks there's no state stores at all). > > When you add new threads either within the same node, or on a different > node, after rebalance each thread should be processing 10 tasks, and hence > owning corresponding set of state stores due to rebalance. The new thread > will first restore the state stores it gets assigned before start > processing. > > > > 2) What happens to the userStream map task? Will the new node be able to > > process this task while the state store is rebuilding/reloading? My > reading > > of the code suggests that this map process will be paused on the new node > > while the state store is rebuilt. The effect of this is that it will lead > > to a delay in events reaching the original node's partitions, which will > be > > seen as late-arriving events. Am I right in this assessment? > > > > > Currently the thread will NOT start processing any tasks until ALL stateful > tasks completes restoring (stateless tasks, like the map tasks in your > example never needs restoration at all). There's an open JIRA for making it > customizable but I cannot find it currently. > > > > 3) How does scaling up work with standby state-store replicas? From my > > reading of the code, it appears that scaling a node up will result in a > > reabalance, with the state assigned to the new node being rebuilt first > > (leading to a pause in processing). Following this, the standy replicas > are > > populated. Am I correct in this reading? > > > > Standby tasks are running in parallel with active stream tasks, and it > simply reads from the changelog topic in read time and populate the standby > store replica; when scaling out, the instances with standby tasks will be > preferred over those who do not have any standby for the task, and hence > when restoring only a very small amount of data needs to be restored > (think: the standby replica of the store is already populated up to offset > 90 at the rebalance, while the active task is writing to the changelog > topic with log end offset 100, so you only need to restore 90 - 100 instead > of 0 - 100). > > > > 4) If my re
Re: Statestore restoration & scaling questions - possible KIP as well.
Hello Adam, Sorry for being late replying on this thread, I've put my comments inlined below. On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare wrote: > Hey Folks > > I have a few questions around the operations of stateful processing while > scaling nodes up/down, and a possible KIP in question #4. Most of them have > to do with task processing during rebuilding of state stores after scaling > nodes up. > > Scenario: > Single node/thread, processing 2 topics (10 partitions each): > User event topic (events) - ie: key:userId, value: ProductId > Product topic (entity) - ie: key: ProductId, value: productData > > My topology looks like this: > > KTable productTable = ... //materialize from product topic > > KStream output = userStream > .map(x => (x.value, x.key) ) //Swap the key and value around > .join(productTable, ... ) //Joiner is not relevant here > .to(...) //Send it to some output topic > > > Here are my questions: > 1) If I scale the processing node count up, partitions will be rebalanced > to the new node. Does processing continue as normal on the original node, > while the new node's processing is paused as the internal state stores are > rebuilt/reloaded? From my reading of the code (and own experience) I > believe this to be the case, but I am just curious in case I missed > something. > > With 2 topics and 10 partitions each, assuming the default PartitionGrouper is used, there should be a total of 20 tasks (10 tasks for map which will send to an internal repartition topic, and 10 tasks for doing the join) created since these two topics are co-partitioned for joins. For example, task-0 would be processing the join from user-topic-partition-0 and product-topic-partition-0, and so on. With a single thread, all of these 20 tasks will be allocated to this thread, which would process them in an iterative manner. Note that since each task has its own state store (e.g. product-state-store-0 for task-0, etc), it means this thread will host all the 10 sets of state stores as well (note for the 10 mapping tasks there's no state stores at all). When you add new threads either within the same node, or on a different node, after rebalance each thread should be processing 10 tasks, and hence owning corresponding set of state stores due to rebalance. The new thread will first restore the state stores it gets assigned before start processing. > 2) What happens to the userStream map task? Will the new node be able to > process this task while the state store is rebuilding/reloading? My reading > of the code suggests that this map process will be paused on the new node > while the state store is rebuilt. The effect of this is that it will lead > to a delay in events reaching the original node's partitions, which will be > seen as late-arriving events. Am I right in this assessment? > > Currently the thread will NOT start processing any tasks until ALL stateful tasks completes restoring (stateless tasks, like the map tasks in your example never needs restoration at all). There's an open JIRA for making it customizable but I cannot find it currently. > 3) How does scaling up work with standby state-store replicas? From my > reading of the code, it appears that scaling a node up will result in a > reabalance, with the state assigned to the new node being rebuilt first > (leading to a pause in processing). Following this, the standy replicas are > populated. Am I correct in this reading? > > Standby tasks are running in parallel with active stream tasks, and it simply reads from the changelog topic in read time and populate the standby store replica; when scaling out, the instances with standby tasks will be preferred over those who do not have any standby for the task, and hence when restoring only a very small amount of data needs to be restored (think: the standby replica of the store is already populated up to offset 90 at the rebalance, while the active task is writing to the changelog topic with log end offset 100, so you only need to restore 90 - 100 instead of 0 - 100). > 4) If my reading in #3 is correct, would it be possible to pre-populate the > standby stores on scale-up before initiating active-task transfer? This > would allow seamless scale-up and scale-down without requiring any pauses > for rebuilding state. I am interested in kicking this off as a KIP if so, > but would appreciate any JIRAs or related KIPs to read up on prior to > digging into this. > > Yes, there is some discussions about this here: https://issues.apache.org/jira/browse/KAFKA-6145 > > Thanks > > Adam Bellemare > -- -- Guozhang
Re: Statestore restoration & scaling questions - possible KIP as well.
Adam, I don’t have an answer for you but I would also be interested in clarification of this process if anyone can provide more details. If your reading is correct I would welcome the KIP to reduce the scaling pauses. Cheers, Rhys McCaig > On Feb 6, 2019, at 7:44 AM, Adam Bellemare wrote: > > Bump - hoping someone has some insight. Alternately, redirection to a more > suitable forum. > > Thanks > > On Sun, Feb 3, 2019 at 10:25 AM Adam Bellemare > wrote: > >> Hey Folks >> >> I have a few questions around the operations of stateful processing while >> scaling nodes up/down, and a possible KIP in question #4. Most of them have >> to do with task processing during rebuilding of state stores after scaling >> nodes up. >> >> Scenario: >> Single node/thread, processing 2 topics (10 partitions each): >> User event topic (events) - ie: key:userId, value: ProductId >> Product topic (entity) - ie: key: ProductId, value: productData >> >> My topology looks like this: >> >> KTable productTable = ... //materialize from product topic >> >> KStream output = userStream >>.map(x => (x.value, x.key) ) //Swap the key and value around >>.join(productTable, ... ) //Joiner is not relevant here >>.to(...) //Send it to some output topic >> >> >> Here are my questions: >> 1) If I scale the processing node count up, partitions will be rebalanced >> to the new node. Does processing continue as normal on the original node, >> while the new node's processing is paused as the internal state stores are >> rebuilt/reloaded? From my reading of the code (and own experience) I >> believe this to be the case, but I am just curious in case I missed >> something. >> >> 2) What happens to the userStream map task? Will the new node be able to >> process this task while the state store is rebuilding/reloading? My reading >> of the code suggests that this map process will be paused on the new node >> while the state store is rebuilt. The effect of this is that it will lead >> to a delay in events reaching the original node's partitions, which will be >> seen as late-arriving events. Am I right in this assessment? >> >> 3) How does scaling up work with standby state-store replicas? From my >> reading of the code, it appears that scaling a node up will result in a >> reabalance, with the state assigned to the new node being rebuilt first >> (leading to a pause in processing). Following this, the standy replicas are >> populated. Am I correct in this reading? >> >> 4) If my reading in #3 is correct, would it be possible to pre-populate >> the standby stores on scale-up before initiating active-task transfer? This >> would allow seamless scale-up and scale-down without requiring any pauses >> for rebuilding state. I am interested in kicking this off as a KIP if so, >> but would appreciate any JIRAs or related KIPs to read up on prior to >> digging into this. >> >> >> Thanks >> >> Adam Bellemare >>
Re: Statestore restoration & scaling questions - possible KIP as well.
Bump - hoping someone has some insight. Alternately, redirection to a more suitable forum. Thanks On Sun, Feb 3, 2019 at 10:25 AM Adam Bellemare wrote: > Hey Folks > > I have a few questions around the operations of stateful processing while > scaling nodes up/down, and a possible KIP in question #4. Most of them have > to do with task processing during rebuilding of state stores after scaling > nodes up. > > Scenario: > Single node/thread, processing 2 topics (10 partitions each): > User event topic (events) - ie: key:userId, value: ProductId > Product topic (entity) - ie: key: ProductId, value: productData > > My topology looks like this: > > KTable productTable = ... //materialize from product topic > > KStream output = userStream > .map(x => (x.value, x.key) ) //Swap the key and value around > .join(productTable, ... ) //Joiner is not relevant here > .to(...) //Send it to some output topic > > > Here are my questions: > 1) If I scale the processing node count up, partitions will be rebalanced > to the new node. Does processing continue as normal on the original node, > while the new node's processing is paused as the internal state stores are > rebuilt/reloaded? From my reading of the code (and own experience) I > believe this to be the case, but I am just curious in case I missed > something. > > 2) What happens to the userStream map task? Will the new node be able to > process this task while the state store is rebuilding/reloading? My reading > of the code suggests that this map process will be paused on the new node > while the state store is rebuilt. The effect of this is that it will lead > to a delay in events reaching the original node's partitions, which will be > seen as late-arriving events. Am I right in this assessment? > > 3) How does scaling up work with standby state-store replicas? From my > reading of the code, it appears that scaling a node up will result in a > reabalance, with the state assigned to the new node being rebuilt first > (leading to a pause in processing). Following this, the standy replicas are > populated. Am I correct in this reading? > > 4) If my reading in #3 is correct, would it be possible to pre-populate > the standby stores on scale-up before initiating active-task transfer? This > would allow seamless scale-up and scale-down without requiring any pauses > for rebuilding state. I am interested in kicking this off as a KIP if so, > but would appreciate any JIRAs or related KIPs to read up on prior to > digging into this. > > > Thanks > > Adam Bellemare >
Statestore restoration & scaling questions - possible KIP as well.
Hey Folks I have a few questions around the operations of stateful processing while scaling nodes up/down, and a possible KIP in question #4. Most of them have to do with task processing during rebuilding of state stores after scaling nodes up. Scenario: Single node/thread, processing 2 topics (10 partitions each): User event topic (events) - ie: key:userId, value: ProductId Product topic (entity) - ie: key: ProductId, value: productData My topology looks like this: KTable productTable = ... //materialize from product topic KStream output = userStream .map(x => (x.value, x.key) ) //Swap the key and value around .join(productTable, ... ) //Joiner is not relevant here .to(...) //Send it to some output topic Here are my questions: 1) If I scale the processing node count up, partitions will be rebalanced to the new node. Does processing continue as normal on the original node, while the new node's processing is paused as the internal state stores are rebuilt/reloaded? From my reading of the code (and own experience) I believe this to be the case, but I am just curious in case I missed something. 2) What happens to the userStream map task? Will the new node be able to process this task while the state store is rebuilding/reloading? My reading of the code suggests that this map process will be paused on the new node while the state store is rebuilt. The effect of this is that it will lead to a delay in events reaching the original node's partitions, which will be seen as late-arriving events. Am I right in this assessment? 3) How does scaling up work with standby state-store replicas? From my reading of the code, it appears that scaling a node up will result in a reabalance, with the state assigned to the new node being rebuilt first (leading to a pause in processing). Following this, the standy replicas are populated. Am I correct in this reading? 4) If my reading in #3 is correct, would it be possible to pre-populate the standby stores on scale-up before initiating active-task transfer? This would allow seamless scale-up and scale-down without requiring any pauses for rebuilding state. I am interested in kicking this off as a KIP if so, but would appreciate any JIRAs or related KIPs to read up on prior to digging into this. Thanks Adam Bellemare