Blob Blame History Raw
From f237c30a5610d35a584f3296d397b93d80ce374e Mon Sep 17 00:00:00 2001
From: Pavel Begunkov <asml.silence@gmail.com>
Date: Wed, 18 Aug 2021 12:42:46 +0100
Subject: [PATCH] io_uring: batch task work locking
Git-commit: f237c30a5610d35a584f3296d397b93d80ce374e
Patch-mainline: v5.15-rc1
References: bsc#1205205

Many task_work handlers either grab ->uring_lock, or may benefit from
having it. Move locking logic out of individual handlers to a lazy
approach controlled by tctx_task_work(), so we don't keep doing
tons of mutex lock/unlock.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/d6a34e147f2507a2f3e2fa1e38a9c541dcad3929.1629286357.git.asml.silence@gmail.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>
Signed-off-by: Gabriel Krisman Bertazi <krisman@suse.de>
---
 fs/io_uring.c | 80 +++++++++++++++++++++++++++++++--------------------
 1 file changed, 49 insertions(+), 31 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 926df5b00b31..cb98a831586a 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -775,7 +775,7 @@ struct async_poll {
 	struct io_poll_iocb	*double_poll;
 };
 
-typedef void (*io_req_tw_func_t)(struct io_kiocb *req);
+typedef void (*io_req_tw_func_t)(struct io_kiocb *req, bool *locked);
 
 struct io_task_work {
 	union {
@@ -1080,6 +1080,14 @@ struct sock *io_uring_get_socket(struct file *file)
 }
 EXPORT_SYMBOL(io_uring_get_socket);
 
+static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
+{
+	if (!*locked) {
+		mutex_lock(&ctx->uring_lock);
+		*locked = true;
+	}
+}
+
 #define io_for_each_link(pos, head) \
 	for (pos = (head); pos; pos = pos->link)
 
@@ -1193,16 +1201,19 @@ static void io_fallback_req_func(struct work_struct *work)
 						fallback_work.work);
 	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
 	struct io_kiocb *req, *tmp;
+	bool locked = false;
 
 	percpu_ref_get(&ctx->refs);
 	llist_for_each_entry_safe(req, tmp, node, io_task_work.fallback_node)
-		req->io_task_work.func(req);
+		req->io_task_work.func(req, &locked);
 
-	mutex_lock(&ctx->uring_lock);
-	if (ctx->submit_state.compl_nr)
-		io_submit_flush_completions(ctx);
-	mutex_unlock(&ctx->uring_lock);
+	if (locked) {
+		if (ctx->submit_state.compl_nr)
+			io_submit_flush_completions(ctx);
+		mutex_unlock(&ctx->uring_lock);
+	}
 	percpu_ref_put(&ctx->refs);
+
 }
 
 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
@@ -1386,12 +1397,15 @@ static void io_prep_async_link(struct io_kiocb *req)
 	}
 }
 
-static void io_queue_async_work(struct io_kiocb *req)
+static void io_queue_async_work(struct io_kiocb *req, bool *locked)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 	struct io_kiocb *link = io_prep_linked_timeout(req);
 	struct io_uring_task *tctx = req->task->io_uring;
 
+	/* must not take the lock, NULL it as a precaution */
+	locked = NULL;
+
 	BUG_ON(!tctx);
 	BUG_ON(!tctx->io_wq);
 
@@ -2013,21 +2027,22 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
 	return __io_req_find_next(req);
 }
 
-static void ctx_flush_and_put(struct io_ring_ctx *ctx)
+static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
 {
 	if (!ctx)
 		return;
-	if (ctx->submit_state.compl_nr) {
-		mutex_lock(&ctx->uring_lock);
+	if (*locked) {
 		if (ctx->submit_state.compl_nr)
 			io_submit_flush_completions(ctx);
 		mutex_unlock(&ctx->uring_lock);
+		*locked = false;
 	}
 	percpu_ref_put(&ctx->refs);
 }
 
 static void tctx_task_work(struct callback_head *cb)
 {
+	bool locked = false;
 	struct io_ring_ctx *ctx = NULL;
 	struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
 						  task_work);
@@ -2050,18 +2065,18 @@ static void tctx_task_work(struct callback_head *cb)
 							    io_task_work.node);
 
 			if (req->ctx != ctx) {
-				ctx_flush_and_put(ctx);
+				ctx_flush_and_put(ctx, &locked);
 				ctx = req->ctx;
 				percpu_ref_get(&ctx->refs);
 			}
-			req->io_task_work.func(req);
+			req->io_task_work.func(req, &locked);
 			node = next;
 		} while (node);
 
 		cond_resched();
 	}
 
-	ctx_flush_and_put(ctx);
+	ctx_flush_and_put(ctx, &locked);
 }
 
 static void io_req_task_work_add(struct io_kiocb *req)
@@ -2113,28 +2128,26 @@ static void io_req_task_work_add(struct io_kiocb *req)
 	}
 }
 
-static void io_req_task_cancel(struct io_kiocb *req)
+static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 
 	/* ctx is guaranteed to stay alive while we hold uring_lock */
-	mutex_lock(&ctx->uring_lock);
+	io_tw_lock(ctx, locked);
 	io_req_complete_failed(req, req->result);
-	mutex_unlock(&ctx->uring_lock);
 }
 
-static void io_req_task_submit(struct io_kiocb *req)
+static void io_req_task_submit(struct io_kiocb *req, bool *locked)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 
 	/* ctx stays valid until unlock, even if we drop all ours ctx->refs */
-	mutex_lock(&ctx->uring_lock);
+	io_tw_lock(ctx, locked);
 	/* req->task == current here, checking PF_EXITING is safe */
 	if (likely(!(req->task->flags & PF_EXITING)))
 		__io_queue_sqe(req);
 	else
 		io_req_complete_failed(req, -EFAULT);
-	mutex_unlock(&ctx->uring_lock);
 }
 
 static void io_req_task_queue_fail(struct io_kiocb *req, int ret)
@@ -2170,6 +2183,11 @@ static void io_free_req(struct io_kiocb *req)
 	__io_free_req(req);
 }
 
+static void io_free_req_work(struct io_kiocb *req, bool *locked)
+{
+	io_free_req(req);
+}
+
 struct req_batch {
 	struct task_struct	*task;
 	int			task_refs;
@@ -2267,7 +2285,7 @@ static inline void io_put_req(struct io_kiocb *req)
 static inline void io_put_req_deferred(struct io_kiocb *req)
 {
 	if (req_ref_put_and_test(req)) {
-		req->io_task_work.func = io_free_req;
+		req->io_task_work.func = io_free_req_work;
 		io_req_task_work_add(req);
 	}
 }
@@ -2562,7 +2580,7 @@ static bool __io_complete_rw_common(struct io_kiocb *req, long res)
 	return false;
 }
 
-static void io_req_task_complete(struct io_kiocb *req)
+static void io_req_task_complete(struct io_kiocb *req, bool *locked)
 {
 	__io_req_complete(req, 0, req->result, io_put_rw_kbuf(req));
 }
@@ -2572,7 +2590,7 @@ static void __io_complete_rw(struct io_kiocb *req, long res, long res2,
 {
 	if (__io_complete_rw_common(req, res))
 		return;
-	io_req_task_complete(req);
+	__io_req_complete(req, 0, req->result, io_put_rw_kbuf(req));
 }
 
 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
@@ -4994,7 +5012,7 @@ static bool io_poll_complete(struct io_kiocb *req, __poll_t mask)
 	return !(flags & IORING_CQE_F_MORE);
 }
 
-static void io_poll_task_func(struct io_kiocb *req)
+static void io_poll_task_func(struct io_kiocb *req, bool *locked)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 	struct io_kiocb *nxt;
@@ -5018,7 +5036,7 @@ static void io_poll_task_func(struct io_kiocb *req)
 		if (done) {
 			nxt = io_put_req_find_next(req);
 			if (nxt)
-				io_req_task_submit(nxt);
+				io_req_task_submit(nxt, locked);
 		}
 	}
 }
@@ -5130,7 +5148,7 @@ static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
 	__io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
 }
 
-static void io_async_task_func(struct io_kiocb *req)
+static void io_async_task_func(struct io_kiocb *req, bool *locked)
 {
 	struct async_poll *apoll = req->apoll;
 	struct io_ring_ctx *ctx = req->ctx;
@@ -5147,7 +5165,7 @@ static void io_async_task_func(struct io_kiocb *req)
 	spin_unlock(&ctx->completion_lock);
 
 	if (!READ_ONCE(apoll->poll.canceled))
-		io_req_task_submit(req);
+		io_req_task_submit(req, locked);
 	else
 		io_req_complete_failed(req, -ECANCELED);
 }
@@ -5546,7 +5564,7 @@ static int io_poll_update(struct io_kiocb *req, unsigned int issue_flags)
 	return 0;
 }
 
-static void io_req_task_timeout(struct io_kiocb *req)
+static void io_req_task_timeout(struct io_kiocb *req, bool *locked)
 {
 	req_set_fail(req);
 	io_req_complete_post(req, -ETIME, 0);
@@ -6103,7 +6121,7 @@ static bool io_drain_req(struct io_kiocb *req)
 	if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
 		spin_unlock(&ctx->completion_lock);
 		kfree(de);
-		io_queue_async_work(req);
+		io_queue_async_work(req, NULL);
 		return true;
 	}
 
@@ -6425,7 +6443,7 @@ static inline struct file *io_file_get(struct io_ring_ctx *ctx,
 		return io_file_get_normal(ctx, req, fd);
 }
 
-static void io_req_task_link_timeout(struct io_kiocb *req)
+static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked)
 {
 	struct io_kiocb *prev = req->timeout.prev;
 	int ret;
@@ -6529,7 +6547,7 @@ static void __io_queue_sqe(struct io_kiocb *req)
 			 * Queued up for async execution, worker will release
 			 * submit reference when the iocb is actually submitted.
 			 */
-			io_queue_async_work(req);
+			io_queue_async_work(req, NULL);
 			break;
 		}
 
@@ -6554,7 +6572,7 @@ static inline void io_queue_sqe(struct io_kiocb *req)
 		if (unlikely(ret))
 			io_req_complete_failed(req, ret);
 		else
-			io_queue_async_work(req);
+			io_queue_async_work(req, NULL);
 	}
 }
 
-- 
2.35.3