On Thu, Mar 07, 2024 at 01:21:50AM -0500, Joel Fernandes wrote:
> 
> 
> On 3/6/2024 5:31 PM, Joel Fernandes wrote:
> > 
> > 
> > On 3/5/2024 2:57 PM, Uladzislau Rezki (Sony) wrote:
> >> Fix a below race by not releasing a wait-head from the
> >> GP-kthread as it can lead for reusing it whereas a worker
> >> can still access it thus execute newly added callbacks too
> >> early.
> >>
> >> CPU 0                              CPU 1
> >> -----                              -----
> >>
> >> // wait_tail == HEAD1
> >> rcu_sr_normal_gp_cleanup() {
> >>     // has passed SR_MAX_USERS_WAKE_FROM_GP
> >>     wait_tail->next = next;
> >>     // done_tail = HEAD1
> >>     smp_store_release(&rcu_state.srs_done_tail, wait_tail);
> >>     queue_work() {
> >>         test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)
> >>         __queue_work()
> >>     }
> >> }
> >>
> >>                                set_work_pool_and_clear_pending()
> >>                                rcu_sr_normal_gp_cleanup_work() {
> >> // new GP, wait_tail == HEAD2
> >> rcu_sr_normal_gp_cleanup() {
> >>     // executes all completion, but stop at HEAD1
> >>     wait_tail->next = HEAD1;
> >>     // done_tail = HEAD2
> >>     smp_store_release(&rcu_state.srs_done_tail, wait_tail);
> >>     queue_work() {
> >>         test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)
> >>         __queue_work()
> >>     }
> >> }
> >>                                  // done = HEAD2
> >>                                  done = 
> >> smp_load_acquire(&rcu_state.srs_done_tail);
> >>                                  // head = HEAD1
> >>                                  head = done->next;
> >>                                  done->next = NULL;
> >>                                  llist_for_each_safe() {
> >>                                  // completes all callbacks, release HEAD1
> >>                                  }
> >>                                }
> >>                                // Process second queue
> >>                                set_work_pool_and_clear_pending()
> >>                                rcu_sr_normal_gp_cleanup_work() {
> >>                                // done = HEAD2
> >>                                done = 
> >> smp_load_acquire(&rcu_state.srs_done_tail);
> >>
> >> // new GP, wait_tail == HEAD3
> >> rcu_sr_normal_gp_cleanup() {
> >>     // Finds HEAD2 with ->next == NULL at the end
> >>     rcu_sr_put_wait_head(HEAD2)
> >>     ...
> >>
> >> // A few more GPs later
> >> rcu_sr_normal_gp_init() {
> >>      HEAD2 = rcu_sr_get_wait_head();
> >>      llist_add(HEAD2, &rcu_state.srs_next);
> >>                                // head == rcu_state.srs_next
> >>                                head = done->next;
> >>                                done->next = NULL;
> >>                                llist_for_each_safe() {
> >>                                 // EXECUTE CALLBACKS TOO EARLY!!!
> >>                                 }
> >>                                }
> >>
> >> Reported-by: Frederic Weisbecker <frede...@kernel.org>
> >> Fixes: 05a10b921000 ("rcu: Support direct wake-up of synchronize_rcu() 
> >> users")
> >> Signed-off-by: Uladzislau Rezki (Sony) <ure...@gmail.com>
> >> ---
> >>  kernel/rcu/tree.c | 22 ++++++++--------------
> >>  1 file changed, 8 insertions(+), 14 deletions(-)
> >>
> >> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> >> index 31f3a61f9c38..475647620b12 100644
> >> --- a/kernel/rcu/tree.c
> >> +++ b/kernel/rcu/tree.c
> >> @@ -1656,21 +1656,11 @@ static void rcu_sr_normal_gp_cleanup(void)
> >>    WARN_ON_ONCE(!rcu_sr_is_wait_head(wait_tail));
> >>  
> >>    /*
> >> -   * Process (a) and (d) cases. See an illustration. Apart of
> >> -   * that it handles the scenario when all clients are done,
> >> -   * wait-head is released if last. The worker is not kicked.
> >> +   * Process (a) and (d) cases. See an illustration.
> >>     */
> >>    llist_for_each_safe(rcu, next, wait_tail->next) {
> >> -          if (rcu_sr_is_wait_head(rcu)) {
> >> -                  if (!rcu->next) {
> >> -                          rcu_sr_put_wait_head(rcu);
> >> -                          wait_tail->next = NULL;
> >> -                  } else {
> >> -                          wait_tail->next = rcu;
> >> -                  }
> >> -
> >> +          if (rcu_sr_is_wait_head(rcu))
> >>                    break;
> >> -          }
> >>  
> >>            rcu_sr_normal_complete(rcu);
> >>            // It can be last, update a next on this step.
> >> @@ -1684,8 +1674,12 @@ static void rcu_sr_normal_gp_cleanup(void)
> >>    smp_store_release(&rcu_state.srs_done_tail, wait_tail);
> >>    ASSERT_EXCLUSIVE_WRITER(rcu_state.srs_done_tail);
> >>  
> >> -  if (wait_tail->next)
> >> -          queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work);
> >> +  /*
> >> +   * We schedule a work in order to perform a final processing
> >> +   * of outstanding users(if still left) and releasing wait-heads
> >> +   * added by rcu_sr_normal_gp_init() call.
> >> +   */
> >> +  queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work);
> >>  }
> 
> One question, why do you need to queue_work() if wait_tail->next == NULL?
> 
> AFAICS, at this stage if wait_tail->next == NULL, you are in CASE f. so the 
> last
> remaining HEAD stays?  (And llist_for_each_safe() in
> rcu_sr_normal_gp_cleanup_work becomes a NOOP).
> 
> Could be something like this, but maybe I missed something:
> 
> @@ -1672,7 +1674,7 @@ static void rcu_sr_normal_gp_cleanup_work(struct
> work_struct *work)
>   */
>  static void rcu_sr_normal_gp_cleanup(void)
>  {
> -       struct llist_node *wait_tail, *next, *rcu;
> +       struct llist_node *wait_tail, *next = NULL, *rcu = NULL;
>         int done = 0;
> 
>         wait_tail = rcu_state.srs_wait_tail;
> @@ -1707,7 +1709,8 @@ static void rcu_sr_normal_gp_cleanup(void)
>          * of outstanding users(if still left) and releasing wait-heads
>          * added by rcu_sr_normal_gp_init() call.
>          */
> -       queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work);
> +       if (rcu)
> +               queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work);
>  }
> 
>  /*
>
Unneeded queueing happens only first time after boot. After one cycle
we always need to queue the work to do a previous cleanups.

if (wait_head->next)
    queue_the_work();


wait_head->next is always not NULL except first cycle after boot.

--
Uladzislau Rezki

Reply via email to