On 8/1/24 13:10, Uladzislau Rezki (Sony) wrote:
> Add a kvfree_rcu_barrier() function. It waits until all
> in-flight pointers are freed over RCU machinery. It does
> not wait any GP completion and it is within its right to
> return immediately if there are no outstanding pointers.
> 
> This function is useful when there is a need to guarantee
> that a memory is fully freed before destroying memory caches.
> For example, during unloading a kernel module.
> 
> Signed-off-by: Uladzislau Rezki (Sony) <[email protected]>

Thanks a lot, Ulad! Locally it makde the kunit test stop failing and for now
I've pushed it as part of my series for bots to test:

https://git.kernel.org/pub/scm/linux/kernel/git/vbabka/linux.git/log/?h=slab-kfree_rcu-destroy-v2r0

I hope after RCU folks review I can carry this in my series and later the
slab git tree so we don't have inter-tree dependencies. Thanks!

Vlastimil

> ---
>  include/linux/rcutiny.h |   5 ++
>  include/linux/rcutree.h |   1 +
>  kernel/rcu/tree.c       | 103 ++++++++++++++++++++++++++++++++++++----
>  3 files changed, 101 insertions(+), 8 deletions(-)
> 
> diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
> index d9ac7b136aea..522123050ff8 100644
> --- a/include/linux/rcutiny.h
> +++ b/include/linux/rcutiny.h
> @@ -111,6 +111,11 @@ static inline void __kvfree_call_rcu(struct rcu_head 
> *head, void *ptr)
>       kvfree(ptr);
>  }
>  
> +static inline void kvfree_rcu_barrier(void)
> +{
> +     rcu_barrier();
> +}
> +
>  #ifdef CONFIG_KASAN_GENERIC
>  void kvfree_call_rcu(struct rcu_head *head, void *ptr);
>  #else
> diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
> index 254244202ea9..58e7db80f3a8 100644
> --- a/include/linux/rcutree.h
> +++ b/include/linux/rcutree.h
> @@ -35,6 +35,7 @@ static inline void rcu_virt_note_context_switch(void)
>  
>  void synchronize_rcu_expedited(void);
>  void kvfree_call_rcu(struct rcu_head *head, void *ptr);
> +void kvfree_rcu_barrier(void);
>  
>  void rcu_barrier(void);
>  void rcu_momentary_dyntick_idle(void);
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index 28c7031711a3..1423013f9fe6 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -3550,18 +3550,15 @@ kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp)
>  }
>  
>  /*
> - * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
> + * Return: %true if a work is queued, %false otherwise.
>   */
> -static void kfree_rcu_monitor(struct work_struct *work)
> +static bool
> +kvfree_rcu_queue_batch(struct kfree_rcu_cpu *krcp)
>  {
> -     struct kfree_rcu_cpu *krcp = container_of(work,
> -             struct kfree_rcu_cpu, monitor_work.work);
>       unsigned long flags;
> +     bool queued = false;
>       int i, j;
>  
> -     // Drain ready for reclaim.
> -     kvfree_rcu_drain_ready(krcp);
> -
>       raw_spin_lock_irqsave(&krcp->lock, flags);
>  
>       // Attempt to start a new batch.
> @@ -3600,11 +3597,27 @@ static void kfree_rcu_monitor(struct work_struct 
> *work)
>                       // be that the work is in the pending state when
>                       // channels have been detached following by each
>                       // other.
> -                     queue_rcu_work(system_wq, &krwp->rcu_work);
> +                     queued = queue_rcu_work(system_wq, &krwp->rcu_work);
>               }
>       }
>  
>       raw_spin_unlock_irqrestore(&krcp->lock, flags);
> +     return queued;
> +}
> +
> +/*
> + * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
> + */
> +static void kfree_rcu_monitor(struct work_struct *work)
> +{
> +     struct kfree_rcu_cpu *krcp = container_of(work,
> +             struct kfree_rcu_cpu, monitor_work.work);
> +
> +     // Drain ready for reclaim.
> +     kvfree_rcu_drain_ready(krcp);
> +
> +     // Queue a batch for a rest.
> +     kvfree_rcu_queue_batch(krcp);
>  
>       // If there is nothing to detach, it means that our job is
>       // successfully done here. In case of having at least one
> @@ -3825,6 +3838,80 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr)
>  }
>  EXPORT_SYMBOL_GPL(kvfree_call_rcu);
>  
> +/**
> + * kvfree_rcu_barrier - Wait until all in-flight kvfree_rcu() complete.
> + *
> + * Note that a single argument of kvfree_rcu() call has a slow path that
> + * triggers synchronize_rcu() following by freeing a pointer. It is done
> + * before the return from the function. Therefore for any single-argument
> + * call that will result in a kfree() to a cache that is to be destroyed
> + * during module exit, it is developer's responsibility to ensure that all
> + * such calls have returned before the call to kmem_cache_destroy().
> + */
> +void kvfree_rcu_barrier(void)
> +{
> +     struct kfree_rcu_cpu_work *krwp;
> +     struct kfree_rcu_cpu *krcp;
> +     bool queued;
> +     int i, cpu;
> +
> +     /*
> +      * Firstly we detach objects and queue them over an RCU-batch
> +      * for all CPUs. Finally queued works are flushed for each CPU.
> +      *
> +      * Please note. If there are outstanding batches for a particular
> +      * CPU, those have to be finished first following by queuing a new.
> +      */
> +     for_each_possible_cpu(cpu) {
> +             krcp = per_cpu_ptr(&krc, cpu);
> +
> +             /*
> +              * Check if this CPU has any objects which have been queued for 
> a
> +              * new GP completion. If not(means nothing to detach), we are 
> done
> +              * with it. If any batch is pending/running for this "krcp", 
> below
> +              * per-cpu flush_rcu_work() waits its completion(see last step).
> +              */
> +             if (!need_offload_krc(krcp))
> +                     continue;
> +
> +             while (1) {
> +                     /*
> +                      * If we are not able to queue a new RCU work it means:
> +                      * - batches for this CPU are still in flight which 
> should
> +                      *   be flushed first and then repeat;
> +                      * - no objects to detach, because of concurrency.
> +                      */
> +                     queued = kvfree_rcu_queue_batch(krcp);
> +
> +                     /*
> +                      * Bail out, if there is no need to offload this "krcp"
> +                      * anymore. As noted earlier it can run concurrently.
> +                      */
> +                     if (queued || !need_offload_krc(krcp))
> +                             break;
> +
> +                     /* There are ongoing batches. */
> +                     for (i = 0; i < KFREE_N_BATCHES; i++) {
> +                             krwp = &(krcp->krw_arr[i]);
> +                             flush_rcu_work(&krwp->rcu_work);
> +                     }
> +             }
> +     }
> +
> +     /*
> +      * Now we guarantee that all objects are flushed.
> +      */
> +     for_each_possible_cpu(cpu) {
> +             krcp = per_cpu_ptr(&krc, cpu);
> +
> +             for (i = 0; i < KFREE_N_BATCHES; i++) {
> +                     krwp = &(krcp->krw_arr[i]);
> +                     flush_rcu_work(&krwp->rcu_work);
> +             }
> +     }
> +}
> +EXPORT_SYMBOL_GPL(kvfree_rcu_barrier);
> +
>  static unsigned long
>  kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc)
>  {


Reply via email to