On Tue, Apr 28, 2020 at 10:58:58PM +0200, Uladzislau Rezki (Sony) wrote:
> Update the kvfree_call_rcu() with head-less support, it
> means an object without any rcu_head structure can be
> reclaimed after GP.
> 
> To store pointers there are two chain-arrays maintained
> one for SLAB and another one is for vmalloc. Both types
> of objects(head-less variant and regular one) are placed
> there based on the type.
> 
> It can be that maintaining of arrays becomes impossible
> due to high memory pressure. For such reason there is an
> emergency path. In that case objects with rcu_head inside
> are just queued building one way list. Later on that list
> is drained.
> 
> As for head-less variant. Such objects do not have any
> rcu_head helper inside. Thus it is dynamically attached.
> As a result an object consists of back-pointer and regular
> rcu_head. It implies that emergency path can detect such
> object type, therefore they are tagged. So a back-pointer
> could be freed as well as dynamically attached wrapper.
> 
> Even though such approach requires dynamic memory it needs
> only sizeof(unsigned long *) + sizeof(struct rcu_head) bytes,
> thus SLAB is used to obtain it. Finally if attaching of the
> rcu_head and queuing get failed, the current context has
> to follow might_sleep() annotation, thus below steps could
> be applied:
>    a) wait until a grace period has elapsed;
>    b) direct inlining of the kvfree() call.
> 
> Reviewed-by: Joel Fernandes (Google) <[email protected]>
> Signed-off-by: Uladzislau Rezki (Sony) <[email protected]>
> Signed-off-by: Joel Fernandes (Google) <[email protected]>
> Co-developed-by: Joel Fernandes (Google) <[email protected]>
> ---
>  kernel/rcu/tree.c | 102 ++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 98 insertions(+), 4 deletions(-)
> 
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index 51726e4c3b4d..501cac02146d 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -3072,15 +3072,31 @@ static void kfree_rcu_work(struct work_struct *work)
>        */
>       for (; head; head = next) {
>               unsigned long offset = (unsigned long)head->func;
> -             void *ptr = (void *)head - offset;
> +             bool headless;
> +             void *ptr;
>  
>               next = head->next;
> +
> +             /* We tag the headless object, if so adjust offset. */
> +             headless = (((unsigned long) head - offset) & BIT(0));
> +             if (headless)
> +                     offset -= 1;
> +
> +             ptr = (void *) head - offset;
> +
>               debug_rcu_head_unqueue((struct rcu_head *)ptr);
>               rcu_lock_acquire(&rcu_callback_map);
>               trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
>  
> -             if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
> +             if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset))) {
> +                     /*
> +                      * If headless free the back-pointer first.
> +                      */
> +                     if (headless)
> +                             kvfree((void *) *((unsigned long *) ptr));
> +
>                       kvfree(ptr);
> +             }
>  
>               rcu_lock_release(&rcu_callback_map);
>               cond_resched_tasks_rcu_qs();
> @@ -3221,6 +3237,13 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu 
> *krcp, void *ptr)
>                       if (IS_ENABLED(CONFIG_PREEMPT_RT))
>                               return false;
>  
> +                     /*
> +                      * TODO: For one argument of kvfree_rcu() we can
> +                      * drop the lock and get the page in sleepable
> +                      * context. That would allow to maintain an array
> +                      * for the CONFIG_PREEMPT_RT as well. Thus we could
> +                      * get rid of dynamic rcu_head attaching code.
> +                      */
>                       bnode = (struct kvfree_rcu_bulk_data *)
>                               __get_free_page(GFP_NOWAIT | __GFP_NOWARN);
>               }
> @@ -3244,6 +3267,23 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu 
> *krcp, void *ptr)
>       return true;
>  }
>  
> +static inline struct rcu_head *
> +attach_rcu_head_to_object(void *obj)
> +{
> +     unsigned long *ptr;
> +
> +     ptr = kmalloc(sizeof(unsigned long *) +
> +                     sizeof(struct rcu_head), GFP_NOWAIT |
> +                             __GFP_RECLAIM | /* can do direct reclaim. */
> +                             __GFP_NORETRY | /* only lightweight one.  */
> +                             __GFP_NOWARN);  /* no failure reports. */

Again, let's please not do this single-pointer-sized allocation.  If
a full page is not available and this is a single-argument kfree_rcu(),
just call synchronize_rcu() and then free the object directly.

It should not be -that- hard to adjust locking for CONFIG_PREEMPT_RT!
For example, have some kind of reservation protocol so that a task
that drops the lock can retry the page allocation and be sure of having
a place to put it.  This might entail making CONFIG_PREEMPT_RT reserve
more pages per CPU.  Or maybe that would not be necessary.

                                                        Thanx, Paul

> +     if (!ptr)
> +             return NULL;
> +
> +     ptr[0] = (unsigned long) obj;
> +     return ((struct rcu_head *) ++ptr);
> +}
> +
>  /*
>   * Queue a request for lazy invocation of appropriate free routine after a
>   * grace period. Please note there are three paths are maintained, two are 
> the
> @@ -3260,16 +3300,34 @@ void kvfree_call_rcu(struct rcu_head *head, 
> rcu_callback_t func)
>  {
>       unsigned long flags;
>       struct kfree_rcu_cpu *krcp;
> +     bool success;
>       void *ptr;
>  
> +     if (head) {
> +             ptr = (void *) head - (unsigned long) func;
> +     } else {
> +             /*
> +              * Please note there is a limitation for the head-less
> +              * variant, that is why there is a clear rule for such
> +              * objects:
> +              *
> +              * it can be used from might_sleep() context only. For
> +              * other places please embed an rcu_head to your data.
> +              */
> +             might_sleep();
> +             ptr = (unsigned long *) func;
> +     }
> +
>       krcp = krc_this_cpu_lock(&flags);
> -     ptr = (void *)head - (unsigned long)func;
>  
>       /* Queue the object but don't yet schedule the batch. */
>       if (debug_rcu_head_queue(ptr)) {
>               /* Probable double kfree_rcu(), just leak. */
>               WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
>                         __func__, head);
> +
> +             /* Mark as success and leave. */
> +             success = true;
>               goto unlock_return;
>       }
>  
> @@ -3277,10 +3335,34 @@ void kvfree_call_rcu(struct rcu_head *head, 
> rcu_callback_t func)
>        * Under high memory pressure GFP_NOWAIT can fail,
>        * in that case the emergency path is maintained.
>        */
> -     if (unlikely(!kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr))) {
> +     success = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr);
> +     if (!success) {
> +             if (head == NULL) {
> +                     /*
> +                      * Headless(one argument kvfree_rcu()) can sleep.
> +                      * Drop the lock and tack it back. So it can do
> +                      * direct lightweight reclaim.
> +                      */
> +                     krc_this_cpu_unlock(krcp, flags);
> +                     head = attach_rcu_head_to_object(ptr);
> +                     krcp = krc_this_cpu_lock(&flags);
> +
> +                     if (head == NULL)
> +                             goto unlock_return;
> +
> +                     /*
> +                      * Tag the headless object. Such objects have a
> +                      * back-pointer to the original allocated memory,
> +                      * that has to be freed as well as dynamically
> +                      * attached wrapper/head.
> +                      */
> +                     func = (rcu_callback_t) (sizeof(unsigned long *) + 1);
> +             }
> +
>               head->func = func;
>               head->next = krcp->head;
>               krcp->head = head;
> +             success = true;
>       }
>  
>       WRITE_ONCE(krcp->count, krcp->count + 1);
> @@ -3294,6 +3376,18 @@ void kvfree_call_rcu(struct rcu_head *head, 
> rcu_callback_t func)
>  
>  unlock_return:
>       krc_this_cpu_unlock(krcp, flags);
> +
> +     /*
> +      * High memory pressure, so inline kvfree() after
> +      * synchronize_rcu(). We can do it from might_sleep()
> +      * context only, so the current CPU can pass the QS
> +      * state.
> +      */
> +     if (!success) {
> +             debug_rcu_head_unqueue(ptr);
> +             synchronize_rcu();
> +             kvfree(ptr);
> +     }
>  }
>  EXPORT_SYMBOL_GPL(kvfree_call_rcu);
>  
> -- 
> 2.20.1
> 

Reply via email to