Re: Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()
Thanks for looking at it, Bill. I initially agreed with you, but Manikumar asked me to check if it's really a regression before calling it a blocker. I tested 2.3 and found the same (buggy) behavior, so I don't think we can call it a regression, and therefore, it's also not a blocker. I'm still working on the test, which is pretty tricky to write well, since so many components inter-operate to produce the behavior. I still want to get this one fixed asap. Thanks, -John On Tue, Nov 12, 2019 at 9:50 AM Bill Bejeck wrote: > > This could be a significant performance issue for some, so I think this fix > needs to go into 2.4. > > Just my 2 cents. > > -Bill > > On Mon, Nov 11, 2019 at 5:57 PM John Roesler wrote: > > > Ok, created: https://github.com/apache/kafka/pull/7681 > > > > I'm on the fence about whether we should file this as a 2.4.0 blocker. > > > > It _sounds_ like this would have a pretty big impact on performance. > > I'm not convinced about any correctness problems, though, since the > > changelogs are only configured with retention when the stores also > > have the same retention. > > > > On the other hand, it doesn't look like a new regression introduced in > > 2.4, but it's a little hard to say where exactly the logical chain got > > broken, since there are quite a few code paths involved. > > > > WDYT? > > -John > > > > On Mon, Nov 11, 2019 at 4:46 PM John Roesler wrote: > > > > > > Hi all, > > > > > > I've just been looking over the code and Guozhang's reply... I think > > > that the reply is reasonable, but it seems like the code may not be > > > precisely implementing this logic. > > > > > > As an entry point, in `StreamThread#runOnce`: > > > If the state is `PARTITIONS_ASSIGNED`, we'll call > > > `taskManager.updateNewAndRestoringTasks()`. > > > If `active.allTasksRunning()`, we will invoke `assignStandbyPartitions()` > > > In `assignStandbyPartitions`, we get the offsets from > > > `standbyTask.checkpointedOffsets()` (as mentioned, this only reflects > > > the offsets as of the last time `StandbyTask#initializeStateStores()` > > > was called (during `AssignedTasks#initializeNewTasks()`) ) > > > Then, we simply `restoreConsumer.seek(partition, offset)` to whatever > > > offset was there. > > > > > > We don't seem to ever call `restoreConsumer.resume()`, which I think > > > is what Guozhang was suggesting. > > > > > > So, in summary, it does look to me like the bug as reported from > > > Navinder is present. Just looking at the code flow, I'd guess that > > > `checkpointedOffsets()` was supposed to be an unmodifiable view onto > > > the stateMgr's checkpoint map. The code flow makes it hard to say at > > > what point this whole process broke down. I'll prepare a fix, and we > > > can just take it step-by-step to consider which released branches to > > > cherry-pick to. > > > > > > Thanks, > > > -John > > > > > > > > > On Sun, Nov 10, 2019 at 8:11 PM Navinder Brar > > > wrote: > > > > > > > > Thanks Guozhang. > > > > The jira is filed: [KAFKA-9169] Standby Tasks point ask for incorrect > > offsets on resuming post suspension - ASF JIRA > > > > > > > > | > > > > | > > > > | | > > > > [KAFKA-9169] Standby Tasks point ask for incorrect offsets on resuming > > p... > > > > > > > > > > > > | > > > > > > > > | > > > > > > > > | > > > > > > > > > > > > > > > > > > > > On Monday, 11 November, 2019, 03:10:37 am IST, Guozhang Wang < > > wangg...@gmail.com> wrote: > > > > > > > > Could you file a JIRA report for this so that we can keep track of it > > and fix? > > > > > > > > Guozhang > > > > On Sun, Nov 10, 2019 at 1:39 PM Guozhang Wang > > wrote: > > > > > > > > If a standby task is suspended, it will write the checkpoint file > > again after flushing its state stores, and when it resumes it does not re > > initialize the position on the consumer and hence it is still the > > task-manager's responsibility to set the right starting offset from the > > latest checkpoint file. If we did not do that, that should still be a bug. > > > > > > > > Guozhang > > > > On Sat, Nov 9, 2019 at 11:33 AM Navinder Brar > > wrote: > > > > > > > > Hi Guozhang, > > > > Thanks for the reply. > > > > So, if I understand it correctly. In versions where KIP-429 was not > > implemented and when we were suspending the standby tasks during rebalance > > and they were resumed post rebalance, they will be reading from the > > beginning of the offsets of changelog, since the will be reading from > > standbyTask.checkpointedOffsets() which was only updated during the first > > initialization. > > > > Regards, > > > > Navinder > > > > On Sunday, 10 November, 2019, 12:50:39 am IST, Guozhang Wang < > > wangg...@gmail.com> wrote: > > > > > > > > Hello Navinder, > > > > > > > > Sorry for the late reply and thanks for bringing this up. I think this > > is > > > > indeed a bug that needs to be fixed. > > > > > > > > The rationale behind was the following: for restoring active tasks and > > > > processing standby
Re: Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()
This could be a significant performance issue for some, so I think this fix needs to go into 2.4. Just my 2 cents. -Bill On Mon, Nov 11, 2019 at 5:57 PM John Roesler wrote: > Ok, created: https://github.com/apache/kafka/pull/7681 > > I'm on the fence about whether we should file this as a 2.4.0 blocker. > > It _sounds_ like this would have a pretty big impact on performance. > I'm not convinced about any correctness problems, though, since the > changelogs are only configured with retention when the stores also > have the same retention. > > On the other hand, it doesn't look like a new regression introduced in > 2.4, but it's a little hard to say where exactly the logical chain got > broken, since there are quite a few code paths involved. > > WDYT? > -John > > On Mon, Nov 11, 2019 at 4:46 PM John Roesler wrote: > > > > Hi all, > > > > I've just been looking over the code and Guozhang's reply... I think > > that the reply is reasonable, but it seems like the code may not be > > precisely implementing this logic. > > > > As an entry point, in `StreamThread#runOnce`: > > If the state is `PARTITIONS_ASSIGNED`, we'll call > > `taskManager.updateNewAndRestoringTasks()`. > > If `active.allTasksRunning()`, we will invoke `assignStandbyPartitions()` > > In `assignStandbyPartitions`, we get the offsets from > > `standbyTask.checkpointedOffsets()` (as mentioned, this only reflects > > the offsets as of the last time `StandbyTask#initializeStateStores()` > > was called (during `AssignedTasks#initializeNewTasks()`) ) > > Then, we simply `restoreConsumer.seek(partition, offset)` to whatever > > offset was there. > > > > We don't seem to ever call `restoreConsumer.resume()`, which I think > > is what Guozhang was suggesting. > > > > So, in summary, it does look to me like the bug as reported from > > Navinder is present. Just looking at the code flow, I'd guess that > > `checkpointedOffsets()` was supposed to be an unmodifiable view onto > > the stateMgr's checkpoint map. The code flow makes it hard to say at > > what point this whole process broke down. I'll prepare a fix, and we > > can just take it step-by-step to consider which released branches to > > cherry-pick to. > > > > Thanks, > > -John > > > > > > On Sun, Nov 10, 2019 at 8:11 PM Navinder Brar > > wrote: > > > > > > Thanks Guozhang. > > > The jira is filed: [KAFKA-9169] Standby Tasks point ask for incorrect > offsets on resuming post suspension - ASF JIRA > > > > > > | > > > | > > > | | > > > [KAFKA-9169] Standby Tasks point ask for incorrect offsets on resuming > p... > > > > > > > > > | > > > > > > | > > > > > > | > > > > > > > > > > > > > > > On Monday, 11 November, 2019, 03:10:37 am IST, Guozhang Wang < > wangg...@gmail.com> wrote: > > > > > > Could you file a JIRA report for this so that we can keep track of it > and fix? > > > > > > Guozhang > > > On Sun, Nov 10, 2019 at 1:39 PM Guozhang Wang > wrote: > > > > > > If a standby task is suspended, it will write the checkpoint file > again after flushing its state stores, and when it resumes it does not re > initialize the position on the consumer and hence it is still the > task-manager's responsibility to set the right starting offset from the > latest checkpoint file. If we did not do that, that should still be a bug. > > > > > > Guozhang > > > On Sat, Nov 9, 2019 at 11:33 AM Navinder Brar > wrote: > > > > > > Hi Guozhang, > > > Thanks for the reply. > > > So, if I understand it correctly. In versions where KIP-429 was not > implemented and when we were suspending the standby tasks during rebalance > and they were resumed post rebalance, they will be reading from the > beginning of the offsets of changelog, since the will be reading from > standbyTask.checkpointedOffsets() which was only updated during the first > initialization. > > > Regards, > > > Navinder > > > On Sunday, 10 November, 2019, 12:50:39 am IST, Guozhang Wang < > wangg...@gmail.com> wrote: > > > > > > Hello Navinder, > > > > > > Sorry for the late reply and thanks for bringing this up. I think this > is > > > indeed a bug that needs to be fixed. > > > > > > The rationale behind was the following: for restoring active tasks and > > > processing standby tasks, we are using the same consumer client within > the > > > thread (the restoreConsumer). And before ALL of the active tasks have > > > completed restoration, the consumer would not get assigned to any of > the > > > standby tasks at all. So in a timeline it should be looking like this > with > > > a rebalance assuming KIP-429 is already in place: > > > > > > T0: rebalance triggered, some tasks gets revoked but some others may > still > > > be active; > > > T0-T1: a subset of active tasks (via the main consumer) and all standby > > > tasks (via the restore consumer) are still processing; > > > T1: rebalance finished, some new tasks gets assigned, and now needs to > be > > > restored. Restore consumer re-assign to fetch from those restoring > consumer > > > only.
Re: Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()
Ok, created: https://github.com/apache/kafka/pull/7681 I'm on the fence about whether we should file this as a 2.4.0 blocker. It _sounds_ like this would have a pretty big impact on performance. I'm not convinced about any correctness problems, though, since the changelogs are only configured with retention when the stores also have the same retention. On the other hand, it doesn't look like a new regression introduced in 2.4, but it's a little hard to say where exactly the logical chain got broken, since there are quite a few code paths involved. WDYT? -John On Mon, Nov 11, 2019 at 4:46 PM John Roesler wrote: > > Hi all, > > I've just been looking over the code and Guozhang's reply... I think > that the reply is reasonable, but it seems like the code may not be > precisely implementing this logic. > > As an entry point, in `StreamThread#runOnce`: > If the state is `PARTITIONS_ASSIGNED`, we'll call > `taskManager.updateNewAndRestoringTasks()`. > If `active.allTasksRunning()`, we will invoke `assignStandbyPartitions()` > In `assignStandbyPartitions`, we get the offsets from > `standbyTask.checkpointedOffsets()` (as mentioned, this only reflects > the offsets as of the last time `StandbyTask#initializeStateStores()` > was called (during `AssignedTasks#initializeNewTasks()`) ) > Then, we simply `restoreConsumer.seek(partition, offset)` to whatever > offset was there. > > We don't seem to ever call `restoreConsumer.resume()`, which I think > is what Guozhang was suggesting. > > So, in summary, it does look to me like the bug as reported from > Navinder is present. Just looking at the code flow, I'd guess that > `checkpointedOffsets()` was supposed to be an unmodifiable view onto > the stateMgr's checkpoint map. The code flow makes it hard to say at > what point this whole process broke down. I'll prepare a fix, and we > can just take it step-by-step to consider which released branches to > cherry-pick to. > > Thanks, > -John > > > On Sun, Nov 10, 2019 at 8:11 PM Navinder Brar > wrote: > > > > Thanks Guozhang. > > The jira is filed: [KAFKA-9169] Standby Tasks point ask for incorrect > > offsets on resuming post suspension - ASF JIRA > > > > | > > | > > | | > > [KAFKA-9169] Standby Tasks point ask for incorrect offsets on resuming p... > > > > > > | > > > > | > > > > | > > > > > > > > > > On Monday, 11 November, 2019, 03:10:37 am IST, Guozhang Wang > > wrote: > > > > Could you file a JIRA report for this so that we can keep track of it and > > fix? > > > > Guozhang > > On Sun, Nov 10, 2019 at 1:39 PM Guozhang Wang wrote: > > > > If a standby task is suspended, it will write the checkpoint file again > > after flushing its state stores, and when it resumes it does not re > > initialize the position on the consumer and hence it is still the > > task-manager's responsibility to set the right starting offset from the > > latest checkpoint file. If we did not do that, that should still be a bug. > > > > Guozhang > > On Sat, Nov 9, 2019 at 11:33 AM Navinder Brar > > wrote: > > > > Hi Guozhang, > > Thanks for the reply. > > So, if I understand it correctly. In versions where KIP-429 was not > > implemented and when we were suspending the standby tasks during rebalance > > and they were resumed post rebalance, they will be reading from the > > beginning of the offsets of changelog, since the will be reading from > > standbyTask.checkpointedOffsets() which was only updated during the first > > initialization. > > Regards, > > Navinder > > On Sunday, 10 November, 2019, 12:50:39 am IST, Guozhang Wang > > wrote: > > > > Hello Navinder, > > > > Sorry for the late reply and thanks for bringing this up. I think this is > > indeed a bug that needs to be fixed. > > > > The rationale behind was the following: for restoring active tasks and > > processing standby tasks, we are using the same consumer client within the > > thread (the restoreConsumer). And before ALL of the active tasks have > > completed restoration, the consumer would not get assigned to any of the > > standby tasks at all. So in a timeline it should be looking like this with > > a rebalance assuming KIP-429 is already in place: > > > > T0: rebalance triggered, some tasks gets revoked but some others may still > > be active; > > T0-T1: a subset of active tasks (via the main consumer) and all standby > > tasks (via the restore consumer) are still processing; > > T1: rebalance finished, some new tasks gets assigned, and now needs to be > > restored. Restore consumer re-assign to fetch from those restoring consumer > > only. > > T1-T2: the main consumer paused all partitions, hence no active tasks > > processing; also restore consumer only fetching for restoring tasks, and > > hence no standby tasks processing; > > T2: restoration completed, restore consumer reassigned to those standby > > tasks. > > > > Note in T1, the standby tasks are all still "running" but they just do not > > proceed any more since the consumer has
Re: Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()
Hi all, I've just been looking over the code and Guozhang's reply... I think that the reply is reasonable, but it seems like the code may not be precisely implementing this logic. As an entry point, in `StreamThread#runOnce`: If the state is `PARTITIONS_ASSIGNED`, we'll call `taskManager.updateNewAndRestoringTasks()`. If `active.allTasksRunning()`, we will invoke `assignStandbyPartitions()` In `assignStandbyPartitions`, we get the offsets from `standbyTask.checkpointedOffsets()` (as mentioned, this only reflects the offsets as of the last time `StandbyTask#initializeStateStores()` was called (during `AssignedTasks#initializeNewTasks()`) ) Then, we simply `restoreConsumer.seek(partition, offset)` to whatever offset was there. We don't seem to ever call `restoreConsumer.resume()`, which I think is what Guozhang was suggesting. So, in summary, it does look to me like the bug as reported from Navinder is present. Just looking at the code flow, I'd guess that `checkpointedOffsets()` was supposed to be an unmodifiable view onto the stateMgr's checkpoint map. The code flow makes it hard to say at what point this whole process broke down. I'll prepare a fix, and we can just take it step-by-step to consider which released branches to cherry-pick to. Thanks, -John On Sun, Nov 10, 2019 at 8:11 PM Navinder Brar wrote: > > Thanks Guozhang. > The jira is filed: [KAFKA-9169] Standby Tasks point ask for incorrect offsets > on resuming post suspension - ASF JIRA > > | > | > | | > [KAFKA-9169] Standby Tasks point ask for incorrect offsets on resuming p... > > > | > > | > > | > > > > > On Monday, 11 November, 2019, 03:10:37 am IST, Guozhang Wang > wrote: > > Could you file a JIRA report for this so that we can keep track of it and > fix? > > Guozhang > On Sun, Nov 10, 2019 at 1:39 PM Guozhang Wang wrote: > > If a standby task is suspended, it will write the checkpoint file again after > flushing its state stores, and when it resumes it does not re initialize the > position on the consumer and hence it is still the task-manager's > responsibility to set the right starting offset from the latest checkpoint > file. If we did not do that, that should still be a bug. > > Guozhang > On Sat, Nov 9, 2019 at 11:33 AM Navinder Brar wrote: > > Hi Guozhang, > Thanks for the reply. > So, if I understand it correctly. In versions where KIP-429 was not > implemented and when we were suspending the standby tasks during rebalance > and they were resumed post rebalance, they will be reading from the beginning > of the offsets of changelog, since the will be reading from > standbyTask.checkpointedOffsets() which was only updated during the first > initialization. > Regards, > Navinder > On Sunday, 10 November, 2019, 12:50:39 am IST, Guozhang Wang > wrote: > > Hello Navinder, > > Sorry for the late reply and thanks for bringing this up. I think this is > indeed a bug that needs to be fixed. > > The rationale behind was the following: for restoring active tasks and > processing standby tasks, we are using the same consumer client within the > thread (the restoreConsumer). And before ALL of the active tasks have > completed restoration, the consumer would not get assigned to any of the > standby tasks at all. So in a timeline it should be looking like this with > a rebalance assuming KIP-429 is already in place: > > T0: rebalance triggered, some tasks gets revoked but some others may still > be active; > T0-T1: a subset of active tasks (via the main consumer) and all standby > tasks (via the restore consumer) are still processing; > T1: rebalance finished, some new tasks gets assigned, and now needs to be > restored. Restore consumer re-assign to fetch from those restoring consumer > only. > T1-T2: the main consumer paused all partitions, hence no active tasks > processing; also restore consumer only fetching for restoring tasks, and > hence no standby tasks processing; > T2: restoration completed, restore consumer reassigned to those standby > tasks. > > Note in T1, the standby tasks are all still "running" but they just do not > proceed any more since the consumer has switched to fetch other partitions; > so at T2 when the consumer switch back it should just resume from where it > has switched off. > > > Guozhang > > > On Mon, Nov 4, 2019 at 4:47 AM Navinder Brar > wrote: > > > Hi, > > Please let me know if this is not the correct forum to ask this. But I > > have a doubt, I was hoping someone can clear it for me. > > In TaskManager:: updateNewAndRestoringTasks(), the > > function assignStandbyPartitions() gets called for all the running standby > > tasks where it populates the Map: checkpointedOffsets from the > > standbyTask.checkpointedOffsets() which is only updated at the time of > > initialization of a StandbyTask(i.e. in it's constructor). I have checked > > and this goes way to 1.1 version when the rebalance protocol was old and > > standby tasks were suspended during rebalance and then resumed
Re: Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()
Thanks Guozhang. The jira is filed: [KAFKA-9169] Standby Tasks point ask for incorrect offsets on resuming post suspension - ASF JIRA | | | | [KAFKA-9169] Standby Tasks point ask for incorrect offsets on resuming p... | | | On Monday, 11 November, 2019, 03:10:37 am IST, Guozhang Wang wrote: Could you file a JIRA report for this so that we can keep track of it and fix? Guozhang On Sun, Nov 10, 2019 at 1:39 PM Guozhang Wang wrote: If a standby task is suspended, it will write the checkpoint file again after flushing its state stores, and when it resumes it does not re initialize the position on the consumer and hence it is still the task-manager's responsibility to set the right starting offset from the latest checkpoint file. If we did not do that, that should still be a bug. Guozhang On Sat, Nov 9, 2019 at 11:33 AM Navinder Brar wrote: Hi Guozhang, Thanks for the reply. So, if I understand it correctly. In versions where KIP-429 was not implemented and when we were suspending the standby tasks during rebalance and they were resumed post rebalance, they will be reading from the beginning of the offsets of changelog, since the will be reading from standbyTask.checkpointedOffsets() which was only updated during the first initialization. Regards, Navinder On Sunday, 10 November, 2019, 12:50:39 am IST, Guozhang Wang wrote: Hello Navinder, Sorry for the late reply and thanks for bringing this up. I think this is indeed a bug that needs to be fixed. The rationale behind was the following: for restoring active tasks and processing standby tasks, we are using the same consumer client within the thread (the restoreConsumer). And before ALL of the active tasks have completed restoration, the consumer would not get assigned to any of the standby tasks at all. So in a timeline it should be looking like this with a rebalance assuming KIP-429 is already in place: T0: rebalance triggered, some tasks gets revoked but some others may still be active; T0-T1: a subset of active tasks (via the main consumer) and all standby tasks (via the restore consumer) are still processing; T1: rebalance finished, some new tasks gets assigned, and now needs to be restored. Restore consumer re-assign to fetch from those restoring consumer only. T1-T2: the main consumer paused all partitions, hence no active tasks processing; also restore consumer only fetching for restoring tasks, and hence no standby tasks processing; T2: restoration completed, restore consumer reassigned to those standby tasks. Note in T1, the standby tasks are all still "running" but they just do not proceed any more since the consumer has switched to fetch other partitions; so at T2 when the consumer switch back it should just resume from where it has switched off. Guozhang On Mon, Nov 4, 2019 at 4:47 AM Navinder Brar wrote: > Hi, > Please let me know if this is not the correct forum to ask this. But I > have a doubt, I was hoping someone can clear it for me. > In TaskManager:: updateNewAndRestoringTasks(), the > function assignStandbyPartitions() gets called for all the running standby > tasks where it populates the Map: checkpointedOffsets from the > standbyTask.checkpointedOffsets() which is only updated at the time of > initialization of a StandbyTask(i.e. in it's constructor). I have checked > and this goes way to 1.1 version when the rebalance protocol was old and > standby tasks were suspended during rebalance and then resumed on > assignment. > I want to know, why post resumption we were/are reading > standbyTask.checkpointedOffsets() to know the offset from where the standby > task should start running and not from stateMgr.checkpointed() which gets > updated on every commit to the checkpoint file. In the former case it's > always reading from the same offset, even those which it had already read > earlier and in cases where changelog topic has a retention time, it gives > offsetOutOfRange exception. > Regards, > Navinder -- -- Guozhang -- -- Guozhang -- -- Guozhang
Re: Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()
If a standby task is suspended, it will write the checkpoint file again after flushing its state stores, and when it resumes it does not re initialize the position on the consumer and hence it is still the task-manager's responsibility to set the right starting offset from the latest checkpoint file. If we did not do that, that should still be a bug. Guozhang On Sat, Nov 9, 2019 at 11:33 AM Navinder Brar wrote: > Hi Guozhang, > > Thanks for the reply. > > So, if I understand it correctly. In versions where KIP-429 was not > implemented and when we were suspending the standby tasks during rebalance > and they were resumed post rebalance, they will be reading from the > beginning of the offsets of changelog, since the will be reading from > standbyTask.checkpointedOffsets() > which was only updated during the first initialization. > > Regards, > Navinder > > On Sunday, 10 November, 2019, 12:50:39 am IST, Guozhang Wang < > wangg...@gmail.com> wrote: > > > Hello Navinder, > > Sorry for the late reply and thanks for bringing this up. I think this is > indeed a bug that needs to be fixed. > > The rationale behind was the following: for restoring active tasks and > processing standby tasks, we are using the same consumer client within the > thread (the restoreConsumer). And before ALL of the active tasks have > completed restoration, the consumer would not get assigned to any of the > standby tasks at all. So in a timeline it should be looking like this with > a rebalance assuming KIP-429 is already in place: > > T0: rebalance triggered, some tasks gets revoked but some others may still > be active; > T0-T1: a subset of active tasks (via the main consumer) and all standby > tasks (via the restore consumer) are still processing; > T1: rebalance finished, some new tasks gets assigned, and now needs to be > restored. Restore consumer re-assign to fetch from those restoring consumer > only. > T1-T2: the main consumer paused all partitions, hence no active tasks > processing; also restore consumer only fetching for restoring tasks, and > hence no standby tasks processing; > T2: restoration completed, restore consumer reassigned to those standby > tasks. > > Note in T1, the standby tasks are all still "running" but they just do not > proceed any more since the consumer has switched to fetch other partitions; > so at T2 when the consumer switch back it should just resume from where it > has switched off. > > > Guozhang > > > On Mon, Nov 4, 2019 at 4:47 AM Navinder Brar > wrote: > > > Hi, > > Please let me know if this is not the correct forum to ask this. But I > > have a doubt, I was hoping someone can clear it for me. > > In TaskManager:: updateNewAndRestoringTasks(), the > > function assignStandbyPartitions() gets called for all the running > standby > > tasks where it populates the Map: checkpointedOffsets from the > > standbyTask.checkpointedOffsets() which is only updated at the time of > > initialization of a StandbyTask(i.e. in it's constructor). I have checked > > and this goes way to 1.1 version when the rebalance protocol was old and > > standby tasks were suspended during rebalance and then resumed on > > assignment. > > I want to know, why post resumption we were/are reading > > standbyTask.checkpointedOffsets() to know the offset from where the > standby > > task should start running and not from stateMgr.checkpointed() which gets > > updated on every commit to the checkpoint file. In the former case it's > > always reading from the same offset, even those which it had already read > > earlier and in cases where changelog topic has a retention time, it gives > > offsetOutOfRange exception. > > Regards, > > Navinder > > > > > -- > -- Guozhang > > -- -- Guozhang
Re: Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()
Hello Navinder, Sorry for the late reply and thanks for bringing this up. I think this is indeed a bug that needs to be fixed. The rationale behind was the following: for restoring active tasks and processing standby tasks, we are using the same consumer client within the thread (the restoreConsumer). And before ALL of the active tasks have completed restoration, the consumer would not get assigned to any of the standby tasks at all. So in a timeline it should be looking like this with a rebalance assuming KIP-429 is already in place: T0: rebalance triggered, some tasks gets revoked but some others may still be active; T0-T1: a subset of active tasks (via the main consumer) and all standby tasks (via the restore consumer) are still processing; T1: rebalance finished, some new tasks gets assigned, and now needs to be restored. Restore consumer re-assign to fetch from those restoring consumer only. T1-T2: the main consumer paused all partitions, hence no active tasks processing; also restore consumer only fetching for restoring tasks, and hence no standby tasks processing; T2: restoration completed, restore consumer reassigned to those standby tasks. Note in T1, the standby tasks are all still "running" but they just do not proceed any more since the consumer has switched to fetch other partitions; so at T2 when the consumer switch back it should just resume from where it has switched off. Guozhang On Mon, Nov 4, 2019 at 4:47 AM Navinder Brar wrote: > Hi, > Please let me know if this is not the correct forum to ask this. But I > have a doubt, I was hoping someone can clear it for me. > In TaskManager:: updateNewAndRestoringTasks(), the > function assignStandbyPartitions() gets called for all the running standby > tasks where it populates the Map: checkpointedOffsets from the > standbyTask.checkpointedOffsets() which is only updated at the time of > initialization of a StandbyTask(i.e. in it's constructor). I have checked > and this goes way to 1.1 version when the rebalance protocol was old and > standby tasks were suspended during rebalance and then resumed on > assignment. > I want to know, why post resumption we were/are reading > standbyTask.checkpointedOffsets() to know the offset from where the standby > task should start running and not from stateMgr.checkpointed() which gets > updated on every commit to the checkpoint file. In the former case it's > always reading from the same offset, even those which it had already read > earlier and in cases where changelog topic has a retention time, it gives > offsetOutOfRange exception. > Regards, > Navinder -- -- Guozhang
Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()
Hi, Please let me know if this is not the correct forum to ask this. But I have a doubt, I was hoping someone can clear it for me. In TaskManager:: updateNewAndRestoringTasks(), the function assignStandbyPartitions() gets called for all the running standby tasks where it populates the Map: checkpointedOffsets from the standbyTask.checkpointedOffsets() which is only updated at the time of initialization of a StandbyTask(i.e. in it's constructor). I have checked and this goes way to 1.1 version when the rebalance protocol was old and standby tasks were suspended during rebalance and then resumed on assignment. I want to know, why post resumption we were/are reading standbyTask.checkpointedOffsets() to know the offset from where the standby task should start running and not from stateMgr.checkpointed() which gets updated on every commit to the checkpoint file. In the former case it's always reading from the same offset, even those which it had already read earlier and in cases where changelog topic has a retention time, it gives offsetOutOfRange exception. Regards, Navinder