On Tue, Feb 20, 2024 at 07:31:13PM +0100, Uladzislau Rezki (Sony) wrote:
> +static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work)
> +{
> +     struct llist_node *done, *rcu, *next, *head;
> +
> +     /*
> +      * This work execution can potentially execute
> +      * while a new done tail is being updated by
> +      * grace period kthread in rcu_sr_normal_gp_cleanup().
> +      * So, read and updates of done tail need to
> +      * follow acq-rel semantics.
> +      *
> +      * Given that wq semantics guarantees that a single work
> +      * cannot execute concurrently by multiple kworkers,
> +      * the done tail list manipulations are protected here.
> +      */
> +     done = smp_load_acquire(&rcu_state.srs_done_tail);
> +     if (!done)
> +             return;
> +
> +     WARN_ON_ONCE(!rcu_sr_is_wait_head(done));
> +     head = done->next;
> +     done->next = NULL;

Can the following race happen?

CPU 0                                                   CPU 1
-----                                                   -----

// wait_tail == HEAD1
rcu_sr_normal_gp_cleanup() {
    // has passed SR_MAX_USERS_WAKE_FROM_GP
    wait_tail->next = next;
    // done_tail = HEAD1
    smp_store_release(&rcu_state.srs_done_tail, wait_tail);
    queue_work() {
        test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)
        __queue_work()
    }
}

                                                      
set_work_pool_and_clear_pending()
                                                      
rcu_sr_normal_gp_cleanup_work() {
// new GP, wait_tail == HEAD2
rcu_sr_normal_gp_cleanup() {
    // executes all completion, but stop at HEAD1
    wait_tail->next = HEAD1;
    // done_tail = HEAD2
    smp_store_release(&rcu_state.srs_done_tail, wait_tail);
    queue_work() {
        test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)
        __queue_work()
    }
}
                                                          // done = HEAD2
                                                          done = 
smp_load_acquire(&rcu_state.srs_done_tail);
                                                          // head = HEAD1
                                                          head = done->next;
                                                          done->next = NULL;
                                                          llist_for_each_safe() 
{
                                                              // completes all 
callbacks, release HEAD1
                                                          }
                                                      }
                                                      // Process second queue
                                                      
set_work_pool_and_clear_pending()
                                                      
rcu_sr_normal_gp_cleanup_work() {
                                                          // done = HEAD2
                                                          done = 
smp_load_acquire(&rcu_state.srs_done_tail);

// new GP, wait_tail == HEAD3
rcu_sr_normal_gp_cleanup() {
    // Finds HEAD2 with ->next == NULL at the end
    rcu_sr_put_wait_head(HEAD2)
    ...

// A few more GPs later
rcu_sr_normal_gp_init() {
     HEAD2 = rcu_sr_get_wait_head();
     llist_add(HEAD2, &rcu_state.srs_next);
                                                          // head == 
rcu_state.srs_next
                                                          head = done->next;
                                                          done->next = NULL;
                                                          llist_for_each_safe() 
{
                                                              // EXECUTE 
CALLBACKS TOO EARLY!!!
                                                          }
                                                      }

Reply via email to