Introduce cgroup aware unbounded worker pools. Whenever a new worker thread is
created, create_worker attaches itself to the cgroups of the task that called
alloc_workqueue(). New worker pools are created if there's no match in the 
global
list of cgroup aware worker pools.

Signed-off-by: Bandan Das <[email protected]>
---
 include/linux/workqueue.h   |   2 +
 kernel/workqueue.c          | 212 +++++++++++++++++++++++++++++++++++++++++---
 kernel/workqueue_internal.h |   4 +
 3 files changed, 204 insertions(+), 14 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index ca73c50..7afb72d 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -131,6 +131,7 @@ struct workqueue_attrs {
        int                     nice;           /* nice level */
        cpumask_var_t           cpumask;        /* allowed CPUs */
        bool                    no_numa;        /* disable NUMA affinity */
+       bool                    cg_enabled;     /* cgroups aware */
 };
 
 static inline struct delayed_work *to_delayed_work(struct work_struct *work)
@@ -308,6 +309,7 @@ enum {
         * http://thread.gmane.org/gmane.linux.kernel/1480396
         */
        WQ_POWER_EFFICIENT      = 1 << 7,
+       WQ_CGROUPS              = 1 << 8,
 
        __WQ_DRAINING           = 1 << 16, /* internal: workqueue is draining */
        __WQ_ORDERED            = 1 << 17, /* internal: workqueue is ordered */
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 7ff5dc7..f052d85 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -48,6 +48,7 @@
 #include <linux/nodemask.h>
 #include <linux/moduleparam.h>
 #include <linux/uaccess.h>
+#include <linux/cgroup.h>
 
 #include "workqueue_internal.h"
 
@@ -139,8 +140,18 @@ enum {
  * MD: wq_mayday_lock protected.
  */
 
+/*
+ * list of tasks that "own" the cgroups that
+ * this pool is attached to
+ */
+struct cgroup_owners {
+       struct task_struct *owner;
+       struct list_head    link;
+};
+
 /* struct worker is defined in workqueue_internal.h */
 
+
 struct worker_pool {
        spinlock_t              lock;           /* the pool lock */
        int                     cpu;            /* I: the associated cpu */
@@ -169,6 +180,8 @@ struct worker_pool {
        struct worker           *manager;       /* L: purely informational */
        struct mutex            attach_mutex;   /* attach/detach exclusion */
        struct list_head        workers;        /* A: attached workers */
+       struct list_head        cg_owners;      /* tasks using this pool*/
+       struct list_head        unbound_node;   /* all cgroup aware pools */
        struct completion       *detach_completion; /* all workers detached */
 
        struct ida              worker_ida;     /* worker IDs for task name */
@@ -219,6 +232,7 @@ struct pool_workqueue {
         */
        struct work_struct      unbound_release_work;
        struct rcu_head         rcu;
+       struct task_struct      *owner; /*for cgroups */
 } __aligned(1 << WORK_STRUCT_FLAG_BITS);
 
 /*
@@ -299,6 +313,7 @@ static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and 
workqueues list */
 static DEFINE_SPINLOCK(wq_mayday_lock);        /* protects wq->maydays list */
 
 static LIST_HEAD(workqueues);          /* PR: list of all workqueues */
+static LIST_HEAD(unbound_cgpool);       /* list of cgroup aware worker pools */
 static bool workqueue_freezing;                /* PL: have wqs started 
freezing? */
 
 /* PL: allowable cpus for unbound wqs and work items */
@@ -425,6 +440,12 @@ static void workqueue_sysfs_unregister(struct 
workqueue_struct *wq);
                if (({ assert_rcu_or_wq_mutex(wq); false; })) { }       \
                else
 
+#define for_each_unbound_cgpool(pool)                                   \
+       list_for_each_entry_rcu((pool), &(unbound_cgpool), unbound_node)
+
+#define for_each_task_cgpool(cgtask, pool)                              \
+       list_for_each_entry_rcu((cgtask), &(pool)->cg_owners, link)
+
 #ifdef CONFIG_DEBUG_OBJECTS_WORK
 
 static struct debug_obj_descr work_debug_descr;
@@ -700,6 +721,7 @@ static struct pool_workqueue *get_work_pwq(struct 
work_struct *work)
  *
  * Return: The worker_pool @work was last associated with.  %NULL if none.
  */
+
 static struct worker_pool *get_work_pool(struct work_struct *work)
 {
        unsigned long data = atomic_long_read(&work->data);
@@ -757,6 +779,7 @@ static bool work_is_canceling(struct work_struct *work)
  * they're being called with pool->lock held.
  */
 
+
 static bool __need_more_worker(struct worker_pool *pool)
 {
        return !atomic_read(&pool->nr_running);
@@ -1072,6 +1095,7 @@ static void get_pwq(struct pool_workqueue *pwq)
 static void put_pwq(struct pool_workqueue *pwq)
 {
        lockdep_assert_held(&pwq->pool->lock);
+
        if (likely(--pwq->refcnt))
                return;
        if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND)))
@@ -1387,6 +1411,9 @@ retry:
        /* pwq which will be used unless @work is executing elsewhere */
        if (!(wq->flags & WQ_UNBOUND))
                pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
+       else if (wq->flags & WQ_CGROUPS)
+               /* use the default pwq */
+               pwq = unbound_pwq_by_node(wq, NUMA_NO_NODE);
        else
                pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
 
@@ -1674,6 +1701,8 @@ static struct worker *alloc_worker(int node)
                /* on creation a worker is in !idle && prep state */
                worker->flags = WORKER_PREP;
        }
+       worker->attach_pending = false;
+       worker->attach_to = NULL;
        return worker;
 }
 
@@ -1695,7 +1724,8 @@ static void worker_attach_to_pool(struct worker *worker,
         * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any
         * online CPUs.  It'll be re-applied when any of the CPUs come up.
         */
-       set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
+       if (!pool->attrs->cg_enabled)
+               set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
 
        /*
         * The pool->attach_mutex ensures %POOL_DISASSOCIATED remains
@@ -1760,6 +1790,7 @@ static struct worker *create_worker(struct worker_pool 
*pool)
        if (id < 0)
                goto fail;
 
+       /* Note: if user specified cgroups, node is NUMA_NO_NODE */
        worker = alloc_worker(pool->node);
        if (!worker)
                goto fail;
@@ -1779,7 +1810,11 @@ static struct worker *create_worker(struct worker_pool 
*pool)
                goto fail;
 
        set_user_nice(worker->task, pool->attrs->nice);
-       kthread_bind_mask(worker->task, pool->attrs->cpumask);
+       if (pool->attrs->cg_enabled) {
+               worker->attach_pending = true;
+               worker->attach_to = current;
+       } else
+               kthread_bind_mask(worker->task, pool->attrs->cpumask);
 
        /* successful, attach the worker to the pool */
        worker_attach_to_pool(worker, pool);
@@ -2172,6 +2207,7 @@ static int worker_thread(void *__worker)
 {
        struct worker *worker = __worker;
        struct worker_pool *pool = worker->pool;
+       int cgattach;
 
        /* tell the scheduler that this is a workqueue worker */
        worker->task->flags |= PF_WQ_WORKER;
@@ -2191,6 +2227,14 @@ woke_up:
                return 0;
        }
 
+       /* this is supposed to run only the first time to attach to cgroups */
+       if (worker->attach_pending) {
+               cgattach = cgroup_attach_task_all(worker->attach_to, current);
+               if (cgattach)
+                       pr_warn("workqueue: worker cgroup attach failed but we 
will still run!");
+               worker->attach_pending = false;
+       }
+
        worker_leave_idle(worker);
 recheck:
        /* no more worker necessary? */
@@ -3181,6 +3225,7 @@ static int init_worker_pool(struct worker_pool *pool)
        pool->watchdog_ts = jiffies;
        INIT_LIST_HEAD(&pool->worklist);
        INIT_LIST_HEAD(&pool->idle_list);
+       INIT_LIST_HEAD(&pool->cg_owners);
        hash_init(pool->busy_hash);
 
        init_timer_deferrable(&pool->idle_timer);
@@ -3251,13 +3296,22 @@ static void put_unbound_pool(struct worker_pool *pool)
 
        /* sanity checks */
        if (WARN_ON(!(pool->cpu < 0)) ||
-           WARN_ON(!list_empty(&pool->worklist)))
+           WARN_ON(!list_empty(&pool->worklist)) ||
+           WARN_ON(!list_empty(&pool->cg_owners)))
                return;
 
        /* release id and unhash */
        if (pool->id >= 0)
                idr_remove(&worker_pool_idr, pool->id);
-       hash_del(&pool->hash_node);
+
+       /*
+        * this pool is going down, so remove from the list of
+        * cgroup aware pools
+        */
+       if (pool->attrs->cg_enabled)
+               list_del(&pool->unbound_node);
+       else
+               hash_del(&pool->hash_node);
 
        /*
         * Become the manager and destroy all workers.  Grabbing
@@ -3290,6 +3344,65 @@ static void put_unbound_pool(struct worker_pool *pool)
        call_rcu_sched(&pool->rcu, rcu_free_pool);
 }
 
+static void remove_task_cgpool(struct worker_pool *pool,
+                              struct task_struct *tsk)
+{
+       struct cgroup_owners *iter;
+
+       if (pool->attrs->cg_enabled) {
+               for_each_task_cgpool(iter, pool) {
+                       if (iter->owner == tsk) {
+                               list_del(&iter->link);
+                               break;
+                       }
+               }
+       }
+}
+
+static bool attach_task_cgpool(struct worker_pool *pool,
+                              struct task_struct *tsk)
+{
+       bool result = true;
+       struct cgroup_owners *entry = kzalloc(sizeof(*entry), GFP_KERNEL);
+
+       if (!entry) {
+               result = false;
+               goto done;
+       }
+
+       entry->owner = tsk;
+       list_add_tail(&entry->link, &pool->cg_owners);
+
+done:
+       return result;
+}
+
+static struct worker_pool *find_cg_matching_pool(struct task_struct *tsk)
+{
+       struct worker_pool *pool = NULL, *iter;
+       bool found = false;
+
+       for_each_unbound_cgpool(iter) {
+               struct cgroup_owners *cgtask;
+
+               for_each_task_cgpool(cgtask, iter) {
+                       if (cgtask->owner == tsk ||
+                           cgroup_match_groups(cgtask->owner, tsk)) {
+                               found = true;
+                               break;
+                       }
+               }
+
+               if (found) {
+                       pool = iter;
+                       pool->refcnt++;
+                       break;
+               }
+       }
+
+       return pool;
+}
+
 /**
  * get_unbound_pool - get a worker_pool with the specified attributes
  * @attrs: the attributes of the worker_pool to get
@@ -3310,9 +3423,19 @@ static struct worker_pool *get_unbound_pool(const struct 
workqueue_attrs *attrs)
        struct worker_pool *pool;
        int node;
        int target_node = NUMA_NO_NODE;
+       bool cgroups_enabled = attrs->cg_enabled;
 
        lockdep_assert_held(&wq_pool_mutex);
 
+       if (cgroups_enabled) {
+               /* "current" is the owner */
+               pool = find_cg_matching_pool(current);
+               if (!pool)
+                       goto create;
+               else
+                       return pool;
+       }
+
        /* do we already have a matching pool? */
        hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
                if (wqattrs_equal(pool->attrs, attrs)) {
@@ -3332,6 +3455,8 @@ static struct worker_pool *get_unbound_pool(const struct 
workqueue_attrs *attrs)
                }
        }
 
+
+create:
        /* nope, create a new one */
        pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node);
        if (!pool || init_worker_pool(pool) < 0)
@@ -3347,6 +3472,9 @@ static struct worker_pool *get_unbound_pool(const struct 
workqueue_attrs *attrs)
         */
        pool->attrs->no_numa = false;
 
+       if (cgroups_enabled)
+               pool->attrs->cg_enabled = true;
+
        if (worker_pool_assign_id(pool) < 0)
                goto fail;
 
@@ -3355,7 +3483,10 @@ static struct worker_pool *get_unbound_pool(const struct 
workqueue_attrs *attrs)
                goto fail;
 
        /* install */
-       hash_add(unbound_pool_hash, &pool->hash_node, hash);
+       if (cgroups_enabled)
+               list_add_tail(&pool->unbound_node, &unbound_cgpool);
+       else
+               hash_add(unbound_pool_hash, &pool->hash_node, hash);
 
        return pool;
 fail:
@@ -3390,6 +3521,8 @@ static void pwq_unbound_release_workfn(struct work_struct 
*work)
        is_last = list_empty(&wq->pwqs);
        mutex_unlock(&wq->mutex);
 
+       remove_task_cgpool(pool, pwq->owner);
+
        mutex_lock(&wq_pool_mutex);
        put_unbound_pool(pool);
        mutex_unlock(&wq_pool_mutex);
@@ -3462,6 +3595,11 @@ static void init_pwq(struct pool_workqueue *pwq, struct 
workqueue_struct *wq,
        pwq->wq = wq;
        pwq->flush_color = -1;
        pwq->refcnt = 1;
+       if (pool->attrs->cg_enabled) {
+               /* Add the current task to pool cg_owners */
+               WARN_ON(!attach_task_cgpool(pool, current));
+               pwq->owner = current;
+       }
        INIT_LIST_HEAD(&pwq->delayed_works);
        INIT_LIST_HEAD(&pwq->pwqs_node);
        INIT_LIST_HEAD(&pwq->mayday_node);
@@ -3502,7 +3640,11 @@ static struct pool_workqueue *alloc_unbound_pwq(struct 
workqueue_struct *wq,
        if (!pool)
                return NULL;
 
-       pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
+       if (wq->unbound_attrs->cg_enabled)
+               pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL);
+       else
+               pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
+
        if (!pwq) {
                put_unbound_pool(pool);
                return NULL;
@@ -3590,8 +3732,10 @@ static void apply_wqattrs_cleanup(struct 
apply_wqattrs_ctx *ctx)
        if (ctx) {
                int node;
 
-               for_each_node(node)
-                       put_pwq_unlocked(ctx->pwq_tbl[node]);
+               if (ctx->attrs->cg_enabled) {
+                       for_each_node(node)
+                               put_pwq_unlocked(ctx->pwq_tbl[node]);
+               }
                put_pwq_unlocked(ctx->dfl_pwq);
 
                free_workqueue_attrs(ctx->attrs);
@@ -3607,11 +3751,14 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
 {
        struct apply_wqattrs_ctx *ctx;
        struct workqueue_attrs *new_attrs, *tmp_attrs;
-       int node;
+       int node, numa_nodes = nr_node_ids;
+       bool cgroups_enabled = wq->unbound_attrs->cg_enabled;
 
        lockdep_assert_held(&wq_pool_mutex);
 
-       ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
+       if (cgroups_enabled)
+               numa_nodes = 0;
+       ctx = kzalloc(sizeof(*ctx) + numa_nodes * sizeof(ctx->pwq_tbl[0]),
                      GFP_KERNEL);
 
        new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
@@ -3623,6 +3770,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
         * Calculate the attrs of the default pwq.
         * If the user configured cpumask doesn't overlap with the
         * wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
+        * This does not copy attrs->cg_enabled
         */
        copy_workqueue_attrs(new_attrs, attrs);
        cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
@@ -3640,11 +3788,16 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
         * If something goes wrong during CPU up/down, we'll fall back to
         * the default pwq covering whole @attrs->cpumask.  Always create
         * it even if we don't use it immediately.
+        * For cgroups aware wqs, there will be on only one pwq
         */
+       new_attrs->cg_enabled = cgroups_enabled;
        ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
        if (!ctx->dfl_pwq)
                goto out_free;
 
+       if (cgroups_enabled)
+               goto done;
+
        for_each_node(node) {
                if (wq_calc_node_cpumask(new_attrs, node, -1, 
tmp_attrs->cpumask)) {
                        ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
@@ -3656,8 +3809,10 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
                }
        }
 
+done:
        /* save the user configured attrs and sanitize it. */
        copy_workqueue_attrs(new_attrs, attrs);
+       /* at this point, note that cg_enabled is untouched */
        cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
        ctx->attrs = new_attrs;
 
@@ -3676,16 +3831,23 @@ out_free:
 static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
 {
        int node;
+       bool cgroups_enabled = ctx->wq->unbound_attrs->cg_enabled;
 
        /* all pwqs have been created successfully, let's install'em */
        mutex_lock(&ctx->wq->mutex);
 
        copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
 
-       /* save the previous pwq and install the new one */
+       /*
+        * save the previous pwq and install the new one
+        * if WQ_CGROUPS is set, then we don't allocate space for pwq_tbl at all
+        * so in that case, only dfl_pwq is valid
+        */
+       if (!cgroups_enabled) {
        for_each_node(node)
                ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
                                                          ctx->pwq_tbl[node]);
+       }
 
        /* @dfl_pwq might not have been used, ensure it's linked */
        link_pwq(ctx->dfl_pwq);
@@ -3882,6 +4044,7 @@ static int alloc_and_link_pwqs(struct workqueue_struct 
*wq)
 static int wq_clamp_max_active(int max_active, unsigned int flags,
                               const char *name)
 {
+       /* Determine max for cgroups ? */
        int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
 
        if (max_active < 1 || max_active > lim)
@@ -3901,14 +4064,30 @@ struct workqueue_struct *__alloc_workqueue_key(const 
char *fmt,
        va_list args;
        struct workqueue_struct *wq;
        struct pool_workqueue *pwq;
+       bool cgroups_enabled = false;
+
+#ifdef CONFIG_CGROUPS
+       /* Only unbound workqueues but not ordered */
+       if ((flags & WQ_CGROUPS) && (flags & WQ_UNBOUND) &&
+           !(flags & __WQ_ORDERED))
+               cgroups_enabled = true;
+#endif
 
        /* see the comment above the definition of WQ_POWER_EFFICIENT */
-       if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
+       if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient) {
                flags |= WQ_UNBOUND;
+               if (cgroups_enabled) {
+                       pr_warn("workqueue: disabling cgroups because 
WQ_POWER_EFFICIENT specified");
+                       cgroups_enabled = false;
+               }
+       }
 
        /* allocate wq and format name */
-       if (flags & WQ_UNBOUND)
-               tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
+       if (flags & WQ_UNBOUND) {
+               if (!cgroups_enabled)
+                       tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
+               /* else let cgroups take care of us */
+       }
 
        wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
        if (!wq)
@@ -3918,6 +4097,8 @@ struct workqueue_struct *__alloc_workqueue_key(const char 
*fmt,
                wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
                if (!wq->unbound_attrs)
                        goto err_free_wq;
+               if (cgroups_enabled)
+                       wq->unbound_attrs->cg_enabled = true;
        }
 
        va_start(args, lock_name);
@@ -4980,6 +5161,9 @@ static ssize_t wq_pool_ids_show(struct device *dev,
        const char *delim = "";
        int node, written = 0;
 
+       if (wq->unbound_attrs->cg_enabled)
+               return 0;
+
        rcu_read_lock_sched();
        for_each_node(node) {
                written += scnprintf(buf + written, PAGE_SIZE - written,
diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
index 4521587..49228cab 100644
--- a/kernel/workqueue_internal.h
+++ b/kernel/workqueue_internal.h
@@ -52,6 +52,10 @@ struct worker {
 
        /* used only by rescuers to point to the target workqueue */
        struct workqueue_struct *rescue_wq;     /* I: the workqueue to rescue */
+
+       /* for cgroups */
+       bool attach_pending;
+       struct task_struct *attach_to;
 };
 
 /**
-- 
2.5.0

Reply via email to