On Thu, May 24, 2018 at 11:04:40AM +0800, Ming Lei wrote:
> When the allocation process is scheduled back and the mapped hw queue is
> changed, fake one extra wake up on previous queue for compensating wake up
> miss, so other allocations on the previous queue won't be starved.
> 
> This patch fixes one request allocation hang issue, which can be
> triggered easily in case of very low nr_request.

Thanks, Ming, this looks better. One comment below.

> Cc: <[email protected]>
> Cc: Omar Sandoval <[email protected]>
> Signed-off-by: Ming Lei <[email protected]>
> ---
> V3:
>       - fix comments as suggested by Jens
>       - remove the wrapper as suggested by Omar
> V2:
>       - fix build failure
> 
> 
>  block/blk-mq-tag.c      | 12 ++++++++++++
>  include/linux/sbitmap.h |  7 +++++++
>  lib/sbitmap.c           | 22 ++++++++++++----------
>  3 files changed, 31 insertions(+), 10 deletions(-)
> 
> diff --git a/block/blk-mq-tag.c b/block/blk-mq-tag.c
> index 336dde07b230..a4e58fc28a06 100644
> --- a/block/blk-mq-tag.c
> +++ b/block/blk-mq-tag.c
> @@ -134,6 +134,8 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data 
> *data)
>       ws = bt_wait_ptr(bt, data->hctx);
>       drop_ctx = data->ctx == NULL;
>       do {
> +             struct sbitmap_queue *bt_prev;
> +
>               /*
>                * We're out of tags on this hardware queue, kick any
>                * pending IO submits before going to sleep waiting for
> @@ -159,6 +161,7 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data 
> *data)
>               if (data->ctx)
>                       blk_mq_put_ctx(data->ctx);
>  
> +             bt_prev = bt;
>               io_schedule();
>  
>               data->ctx = blk_mq_get_ctx(data->q);
> @@ -170,6 +173,15 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data 
> *data)
>                       bt = &tags->bitmap_tags;
>  
>               finish_wait(&ws->wait, &wait);
> +
> +             /*
> +              * If destination hw queue is changed, fake wake up on
> +              * previous queue for compensating the wake up miss, so
> +              * other allocations on previous queue won't be starved.
> +              */
> +             if (bt != bt_prev)
> +                     sbitmap_queue_wake_up(bt_prev);
> +
>               ws = bt_wait_ptr(bt, data->hctx);
>       } while (1);
>  
> diff --git a/include/linux/sbitmap.h b/include/linux/sbitmap.h
> index 841585f6e5f2..bba9d80191b7 100644
> --- a/include/linux/sbitmap.h
> +++ b/include/linux/sbitmap.h
> @@ -484,6 +484,13 @@ static inline struct sbq_wait_state *sbq_wait_ptr(struct 
> sbitmap_queue *sbq,
>  void sbitmap_queue_wake_all(struct sbitmap_queue *sbq);
>  
>  /**
> + * sbitmap_queue_wake_up() - Wake up some of waiters in one waitqueue
> + * on a &struct sbitmap_queue.
> + * @sbq: Bitmap queue to wake up.
> + */
> +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq);
> +
> +/**
>   * sbitmap_queue_show() - Dump &struct sbitmap_queue information to a &struct
>   * seq_file.
>   * @sbq: Bitmap queue to show.
> diff --git a/lib/sbitmap.c b/lib/sbitmap.c
> index e6a9c06ec70c..14e027a33ffa 100644
> --- a/lib/sbitmap.c
> +++ b/lib/sbitmap.c
> @@ -335,8 +335,9 @@ void sbitmap_queue_resize(struct sbitmap_queue *sbq, 
> unsigned int depth)
>       if (sbq->wake_batch != wake_batch) {
>               WRITE_ONCE(sbq->wake_batch, wake_batch);
>               /*
> -              * Pairs with the memory barrier in sbq_wake_up() to ensure that
> -              * the batch size is updated before the wait counts.
> +              * Pairs with the memory barrier in sbitmap_queue_wake_up()
> +              * to ensure that the batch size is updated before the wait
> +              * counts.
>                */
>               smp_mb__before_atomic();
>               for (i = 0; i < SBQ_WAIT_QUEUES; i++)
> @@ -425,7 +426,7 @@ static struct sbq_wait_state *sbq_wake_ptr(struct 
> sbitmap_queue *sbq)
>       return NULL;
>  }
>  
> -static void sbq_wake_up(struct sbitmap_queue *sbq)
> +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq)
>  {
>       struct sbq_wait_state *ws;
>       unsigned int wake_batch;

Just after this is:

        /*
         * Pairs with the memory barrier in set_current_state() to ensure the
         * proper ordering of clear_bit()/waitqueue_active() in the waker and
         * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the
         * waiter. See the comment on waitqueue_active(). This is __after_atomic
         * because we just did clear_bit_unlock() in the caller.
         */
        smp_mb__after_atomic();

But there's no atomic operation before this in blk_mq_get_tag(). I don't
think this memory barrier is necessary for the blk_mq_get_tag() case, so
let's move it to sbitmap_queue_clear():

diff --git a/lib/sbitmap.c b/lib/sbitmap.c
index 14e027a33ffa..537328a98c06 100644
--- a/lib/sbitmap.c
+++ b/lib/sbitmap.c
@@ -432,15 +432,6 @@ void sbitmap_queue_wake_up(struct sbitmap_queue *sbq)
        unsigned int wake_batch;
        int wait_cnt;
 
-       /*
-        * Pairs with the memory barrier in set_current_state() to ensure the
-        * proper ordering of clear_bit()/waitqueue_active() in the waker and
-        * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the
-        * waiter. See the comment on waitqueue_active(). This is __after_atomic
-        * because we just did clear_bit_unlock() in the caller.
-        */
-       smp_mb__after_atomic();
-
        ws = sbq_wake_ptr(sbq);
        if (!ws)
                return;
@@ -472,6 +463,13 @@ void sbitmap_queue_clear(struct sbitmap_queue *sbq, 
unsigned int nr,
                         unsigned int cpu)
 {
        sbitmap_clear_bit_unlock(&sbq->sb, nr);
+       /*
+        * Pairs with the memory barrier in set_current_state() to ensure the
+        * proper ordering of clear_bit_unlock()/waitqueue_active() in the waker
+        * and test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the
+        * waiter. See the comment on waitqueue_active().
+        */
+       smp_mb__after_atomic();
        sbitmap_queue_wake_up(sbq);
        if (likely(!sbq->round_robin && nr < sbq->sb.depth))
                *per_cpu_ptr(sbq->alloc_hint, cpu) = nr;

That comment also mentioned clear_bit() but we do clear_bit_unlock()
now, so we can update that at the same time.

> @@ -454,23 +455,24 @@ static void sbq_wake_up(struct sbitmap_queue *sbq)
>                */
>               smp_mb__before_atomic();
>               /*
> -              * If there are concurrent callers to sbq_wake_up(), the last
> -              * one to decrement the wait count below zero will bump it back
> -              * up. If there is a concurrent resize, the count reset will
> -              * either cause the cmpxchg to fail or overwrite after the
> -              * cmpxchg.
> +              * If there are concurrent callers to sbitmap_queue_wake_up(),
> +              * the last one to decrement the wait count below zero will
> +              * bump it back up. If there is a concurrent resize, the count
> +              * reset will either cause the cmpxchg to fail or overwrite
> +              * after the cmpxchg.
>                */
>               atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wait_cnt + wake_batch);
>               sbq_index_atomic_inc(&sbq->wake_index);
>               wake_up_nr(&ws->wait, wake_batch);
>       }
>  }
> +EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up);
>  
>  void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr,
>                        unsigned int cpu)
>  {
>       sbitmap_clear_bit_unlock(&sbq->sb, nr);
> -     sbq_wake_up(sbq);
> +     sbitmap_queue_wake_up(sbq);
>       if (likely(!sbq->round_robin && nr < sbq->sb.depth))
>               *per_cpu_ptr(sbq->alloc_hint, cpu) = nr;
>  }
> @@ -482,7 +484,7 @@ void sbitmap_queue_wake_all(struct sbitmap_queue *sbq)
>  
>       /*
>        * Pairs with the memory barrier in set_current_state() like in
> -      * sbq_wake_up().
> +      * sbitmap_queue_wake_up().
>        */
>       smp_mb();
>       wake_index = atomic_read(&sbq->wake_index);
> -- 
> 2.9.5
> 

Reply via email to