Currently, single thread workqueue only have single pwq, all of
works are queued the same workerpool. This is not optimal on
NUMA machines, will cause workers jump around across node.

This patch add a new wq flags __WQ_DYNAMIC, and a new macros
create_singlethread_dynamic_workqueue, this new kind of
single thread workqueue creates a separate pwq covering the
intersecting CPUS for each NUMA node which has online CPUS
in @attrs->cpumask instead of mapping all entries of numa_pwq_tbl[]
to the same pwq. After this, we can specify the @cpu of
queue_work_on, so the work can be executed on the same NUMA
node of the specified @cpu.

Signed-off-by: Biaoxiang Ye <yebiaoxi...@huawei.com>
---
 include/linux/workqueue.h |  7 +++++++
 kernel/workqueue.c        | 40 ++++++++++++++++++++++++++++++++++------
 2 files changed, 41 insertions(+), 6 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index b7c585b..b2e8121 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -106,6 +106,7 @@ struct work_struct {
 #ifdef CONFIG_LOCKDEP
        struct lockdep_map lockdep_map;
 #endif
+       int intimate_cpu;
 };
 
 #define WORK_DATA_INIT()       ATOMIC_LONG_INIT((unsigned 
long)WORK_STRUCT_NO_POOL)
@@ -235,6 +236,7 @@ static inline void destroy_delayed_work_on_stack(struct 
delayed_work *work) { }
                lockdep_init_map(&(_work)->lockdep_map, 
"(work_completion)"#_work, &__key, 0); \
                INIT_LIST_HEAD(&(_work)->entry);                        \
                (_work)->func = (_func);                                \
+               (_work)->intimate_cpu = -1;                     \
        } while (0)
 #else
 #define __INIT_WORK(_work, _func, _onstack)                            \
@@ -243,6 +245,7 @@ static inline void destroy_delayed_work_on_stack(struct 
delayed_work *work) { }
                (_work)->data = (atomic_long_t) WORK_DATA_INIT();       \
                INIT_LIST_HEAD(&(_work)->entry);                        \
                (_work)->func = (_func);                                \
+               (_work)->intimate_cpu = -1;                     \
        } while (0)
 #endif
 
@@ -344,6 +347,7 @@ enum {
        __WQ_ORDERED            = 1 << 17, /* internal: workqueue is ordered */
        __WQ_LEGACY             = 1 << 18, /* internal: create*_workqueue() */
        __WQ_ORDERED_EXPLICIT   = 1 << 19, /* internal: 
alloc_ordered_workqueue() */
+       __WQ_DYNAMIC            = 1 << 20, /* internal: workqueue is dynamic  */
 
        WQ_MAX_ACTIVE           = 512,    /* I like 512, better ideas? */
        WQ_MAX_UNBOUND_PER_CPU  = 4,      /* 4 * #cpus for unbound wq */
@@ -432,6 +436,9 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
                        WQ_MEM_RECLAIM, 1, (name))
 #define create_singlethread_workqueue(name)                            \
        alloc_ordered_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, name)
+#define create_singlethread_dynamic_workqueue(name)     \
+       alloc_ordered_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM| \
+                       __WQ_DYNAMIC, name)
 
 extern void destroy_workqueue(struct workqueue_struct *wq);
 
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 601d611..418081c 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -1395,8 +1395,11 @@ static void __queue_work(int cpu, struct 
workqueue_struct *wq,
                         struct work_struct *work)
 {
        struct pool_workqueue *pwq;
+       struct pool_workqueue *last_pwq;
        struct worker_pool *last_pool;
        struct list_head *worklist;
+       struct work_struct *work_tmp;
+       bool pending = false;    /* is work pending in last worker pool */
        unsigned int work_flags;
        unsigned int req_cpu = cpu;
 
@@ -1441,7 +1444,27 @@ static void __queue_work(int cpu, struct 
workqueue_struct *wq,
                if (worker && worker->current_pwq->wq == wq) {
                        pwq = worker->current_pwq;
                } else {
-                       /* meh... not running there, queue here */
+                       /*
+                        * meh... not running there, queue here
+                        * we can't break the ordering guarantee of dynamic 
single thread wq,
+                        * so have to check whethere the work are still pending 
in last pool or not.
+                        */
+                       if (wq->flags & __WQ_DYNAMIC) {
+                               list_for_each_entry(work_tmp, 
&last_pool->worklist, entry) {
+                                       if (work_tmp == work) {
+                                               pending = true;
+                                               break;
+                                       }
+                               }
+                               if (pending) {
+                                       last_pwq = get_work_pwq(work);
+                                       if (likely(last_pwq))
+                                               pwq = last_pwq;
+                                       else    /* queue here */
+                                               pr_warn("workqueue: work 
pending in last pool, "
+                                                               "but can't get 
pwq.\n");
+                               }
+                       }
                        spin_unlock(&last_pool->lock);
                        spin_lock(&pwq->pool->lock);
                }
@@ -3906,6 +3929,9 @@ static void apply_wqattrs_cleanup(struct 
apply_wqattrs_ctx *ctx)
         * wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
         */
        copy_workqueue_attrs(new_attrs, attrs);
+       if (wq->flags & __WQ_DYNAMIC)
+               new_attrs->no_numa = false;
+
        cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
        if (unlikely(cpumask_empty(new_attrs->cpumask)))
                cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask);
@@ -4154,10 +4180,12 @@ static int alloc_and_link_pwqs(struct workqueue_struct 
*wq)
                return 0;
        } else if (wq->flags & __WQ_ORDERED) {
                ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
-               /* there should only be single pwq for ordering guarantee */
-               WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
-                             wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
-                    "ordering guarantee broken for workqueue %s\n", wq->name);
+               if (!(wq->flags & __WQ_DYNAMIC)) {
+                       /* there should only be single pwq for ordering 
guarantee */
+                       WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node 
||
+                                       wq->pwqs.prev != 
&wq->dfl_pwq->pwqs_node),
+                                       "ordering guarantee broken for 
workqueue %s\n", wq->name);
+               }
                return ret;
        } else {
                return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
@@ -5217,7 +5245,7 @@ static int workqueue_apply_unbound_cpumask(void)
                if (!(wq->flags & WQ_UNBOUND))
                        continue;
                /* creating multiple pwqs breaks ordering guarantee */
-               if (wq->flags & __WQ_ORDERED)
+               if ((wq->flags & __WQ_ORDERED) && !(wq->flags & __WQ_DYNAMIC))
                        continue;
 
                ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs);
-- 
1.8.3.1


-- 
You received this message because you are subscribed to the Google Groups 
"open-iscsi" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to open-iscsi+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/open-iscsi/1563991180-11532-2-git-send-email-yebiaoxiang%40huawei.com.

Reply via email to