Blob Blame History Raw
From: Lai Jiangshan <laijs@cn.fujitsu.com>
Date: Tue, 15 Apr 2014 17:52:19 +0800
Subject: workqueue: allow changing attributions of ordered workqueue
Patch-mainline: Never, RT specific
References: SLE Realtime Extension

Allow changing ordered workqueue's cpumask
Allow changing ordered workqueue's nice value
Allow registering ordered workqueue to SYSFS

Still disallow changing ordered workqueue's max_active which breaks ordered guarantee
Still disallow changing ordered workqueue's no_numa which breaks ordered guarantee

Changing attributions will introduce new pwqs in a given workqueue, and
old implement doesn't have any solution to handle multi-pwqs on ordered workqueues,
so changing attributions for ordered workqueues is disallowed.

After I investigated several solutions which try to handle multi-pwqs on ordered workqueues,
I found the solution which reuse max_active are the simplest.
In this solution, the new introduced pwq's max_active is kept as *ZERO* until it become
the oldest pwq. Thus only one (the oldest) pwq is active, and ordered is guarantee.

This solution forces ordered on higher level while the non-reentrancy is also
forced. so we don't need to queue any work to its previous pool. And we shouldn't
do it. we must disallow any work repeatedly requeues itself back-to-back
which keeps the oldest pwq stay and make newest(if have) pwq starvation(non-active for ever).

Build test only.
This patch depends on previous patch:
workqueue: add __WQ_FREEZING and remove POOL_FREEZING

Frederic: You can pick this patch to your updated patchset before TJ apply it.

Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
Signed-off-by: Mike Galbraith <mgalbraith@suse.de>
---
 kernel/workqueue.c |   65 ++++++++++++++++++++++++++++++++++-------------------
 1 file changed, 42 insertions(+), 23 deletions(-)

--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -1430,8 +1430,14 @@ static void __queue_work(int cpu, struct
 	 * If @work was previously on a different pool, it might still be
 	 * running there, in which case the work needs to be queued on that
 	 * pool to guarantee non-reentrancy.
+	 *
+	 * pwqs are guaranteed active orderly for ordered workqueue, and
+	 * it guarantees non-reentrancy for works. So any work doesn't need
+	 * to be queued on previous pool. And the works shouldn't be queued
+	 * on previous pool, since we need to guarantee the prevous pwq
+	 * releasable to avoid work-stavation on the newest pool.
 	 */
-	last_pool = get_work_pool(work);
+	last_pool = wq->flags & __WQ_ORDERED ? NULL : get_work_pool(work);
 	if (last_pool && last_pool != pwq->pool) {
 		struct worker *worker;
 
@@ -3448,6 +3454,13 @@ static void rcu_free_pwq(struct rcu_head
 			container_of(rcu, struct pool_workqueue, rcu));
 }
 
+static struct pool_workqueue *oldest_pwq(struct workqueue_struct *wq)
+{
+	return list_last_entry(&wq->pwqs, struct pool_workqueue, pwqs_node);
+}
+
+static void pwq_adjust_max_active(struct pool_workqueue *pwq);
+
 /*
  * Scheduled on system_wq by put_pwq() when an unbound pwq hits zero refcnt
  * and needs to be destroyed.
@@ -3466,6 +3479,9 @@ static void pwq_unbound_release_workfn(s
 	mutex_lock(&wq->mutex);
 	list_del_rcu(&pwq->pwqs_node);
 	is_last = list_empty(&wq->pwqs);
+	/* try to active the oldest pwq when needed */
+	if (!is_last && (wq->flags & __WQ_ORDERED))
+		pwq_adjust_max_active(oldest_pwq(wq));
 	mutex_unlock(&wq->mutex);
 
 	mutex_lock(&wq_pool_mutex);
@@ -3482,6 +3498,16 @@ static void pwq_unbound_release_workfn(s
 		call_rcu(&wq->rcu, rcu_free_wq);
 }
 
+static bool pwq_active(struct pool_workqueue *pwq)
+{
+	/* Only the oldest pwq is active in the ordered wq */
+	if (pwq->wq->flags & __WQ_ORDERED)
+		return pwq == oldest_pwq(pwq->wq);
+
+	/* All pwqs in the non-ordered wq are active */
+	return true;
+}
+
 /**
  * pwq_adjust_max_active - update a pwq's max_active to the current setting
  * @pwq: target pool_workqueue
@@ -3511,7 +3537,7 @@ static void pwq_adjust_max_active(struct
 	 * this function is called at least once after @workqueue_freezing
 	 * is updated and visible.
 	 */
-	if (!freezable || !workqueue_freezing) {
+	if ((!freezable || !workqueue_freezing) && pwq_active(pwq)) {
 		pwq->max_active = wq->saved_max_active;
 
 		while (!list_empty(&pwq->delayed_works) &&
@@ -3562,11 +3588,11 @@ static void link_pwq(struct pool_workque
 	/* set the matching work_color */
 	pwq->work_color = wq->work_color;
 
+	/* link in @pwq on the head of &wq->pwqs */
+	list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
+
 	/* sync max_active to the current setting */
 	pwq_adjust_max_active(pwq);
-
-	/* link in @pwq */
-	list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
 }
 
 /* obtain a pool matching @attr and create a pwq associating the pool and @wq */
@@ -3796,8 +3822,8 @@ static int apply_workqueue_attrs_locked(
 	if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
 		return -EINVAL;
 
-	/* creating multiple pwqs breaks ordering guarantee */
-	if (!list_empty(&wq->pwqs)) {
+	/* creating multiple per-node pwqs breaks ordering guarantee */
+	if (!attrs->no_numa) {
 		if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
 			return -EINVAL;
 
@@ -3931,7 +3957,7 @@ static void wq_update_unbound_numa(struc
 static int alloc_and_link_pwqs(struct workqueue_struct *wq)
 {
 	bool highpri = wq->flags & WQ_HIGHPRI;
-	int cpu, ret;
+	int cpu;
 
 	if (!(wq->flags & WQ_UNBOUND)) {
 		wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
@@ -3952,12 +3978,7 @@ static int alloc_and_link_pwqs(struct wo
 		}
 		return 0;
 	} else if (wq->flags & __WQ_ORDERED) {
-		ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
-		/* there should only be single pwq for ordering guarantee */
-		WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
-			      wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
-		     "ordering guarantee broken for workqueue %s\n", wq->name);
-		return ret;
+		return apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
 	} else {
 		return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
 	}
@@ -5214,11 +5235,17 @@ static ssize_t wq_numa_store(struct devi
 			     const char *buf, size_t count)
 {
 	struct workqueue_struct *wq = dev_to_wq(dev);
-	struct workqueue_attrs *attrs;
+	struct workqueue_attrs *attrs = NULL;
 	int v, ret = -ENOMEM;
 
 	apply_wqattrs_lock();
 
+	/* Creating per-node pwqs breaks ordering guarantee. Keep no_numa = 1 */
+	if (WARN_ON(wq->flags & __WQ_ORDERED)) {
+		ret = -EINVAL;
+		goto out_unlock;
+	}
+
 	attrs = wq_sysfs_prep_attrs(wq);
 	if (!attrs)
 		goto out_unlock;
@@ -5321,14 +5348,6 @@ int workqueue_sysfs_register(struct work
 	struct wq_device *wq_dev;
 	int ret;
 
-	/*
-	 * Adjusting max_active or creating new pwqs by applying
-	 * attributes breaks ordering guarantee.  Disallow exposing ordered
-	 * workqueues.
-	 */
-	if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
-		return -EINVAL;
-
 	wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
 	if (!wq_dev)
 		return -ENOMEM;