Blob Blame History Raw
From e7a6c00dc77aedf27a601738ea509f1caea6d673 Mon Sep 17 00:00:00 2001
From: Jens Axboe <axboe@kernel.dk>
Date: Fri, 4 Mar 2022 08:22:22 -0700
Subject: [PATCH] io_uring: add support for registering ring file descriptors
Git-commit: e7a6c00dc77aedf27a601738ea509f1caea6d673
Patch-mainline: v5.18-rc1
References: bsc#1205205

Lots of workloads use multiple threads, in which case the file table is
shared between them. This makes getting and putting the ring file
descriptor for each io_uring_enter(2) system call more expensive, as it
involves an atomic get and put for each call.

Similarly to how we allow registering normal file descriptors to avoid
this overhead, add support for an io_uring_register(2) API that allows
to register the ring fds themselves:

1) IORING_REGISTER_RING_FDS - takes an array of io_uring_rsrc_update
   structs, and registers them with the task.
2) IORING_UNREGISTER_RING_FDS - takes an array of io_uring_src_update
   structs, and unregisters them.

When a ring fd is registered, it is internally represented by an offset.
This offset is returned to the application, and the application then
uses this offset and sets IORING_ENTER_REGISTERED_RING for the
io_uring_enter(2) system call. This works just like using a registered
file descriptor, rather than a real one, in an SQE, where
IOSQE_FIXED_FILE gets set to tell io_uring that we're using an internal
offset/descriptor rather than a real file descriptor.

In initial testing, this provides a nice bump in performance for
threaded applications in real world cases where the batch count (eg
number of requests submitted per io_uring_enter(2) invocation) is low.
In a microbenchmark, submitting NOP requests, we see the following
increases in performance:

Requests per syscall	Baseline	Registered	Increase
----------------------------------------------------------------
1			 ~7030K		 ~8080K		+15%
2			~13120K		~14800K		+13%
4			~22740K		~25300K		+11%

Co-developed-by: Xiaoguang Wang <xiaoguang.wang@linux.alibaba.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
Signed-off-by: Gabriel Krisman Bertazi <krisman@suse.de>
---
 fs/io_uring.c                 | 182 +++++++++++++++++++++++++++++++++-
 include/linux/io_uring.h      |   5 +-
 include/uapi/linux/io_uring.h |  13 ++-
 3 files changed, 190 insertions(+), 10 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index f8fd3b2bb30d..daa3609b742f 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -466,6 +466,11 @@ struct io_ring_ctx {
 	};
 };
 
+/*
+ * Arbitrary limit, can be raised if need be
+ */
+#define IO_RINGFD_REG_MAX 16
+
 struct io_uring_task {
 	/* submission side */
 	int			cached_refs;
@@ -481,6 +486,7 @@ struct io_uring_task {
 	struct io_wq_work_list	task_list;
 	struct io_wq_work_list	prior_task_list;
 	struct callback_head	task_work;
+	struct file		**registered_rings;
 	bool			task_running;
 };
 
@@ -8788,8 +8794,16 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task,
 	if (unlikely(!tctx))
 		return -ENOMEM;
 
+	tctx->registered_rings = kcalloc(IO_RINGFD_REG_MAX,
+					 sizeof(struct file *), GFP_KERNEL);
+	if (unlikely(!tctx->registered_rings)) {
+		kfree(tctx);
+		return -ENOMEM;
+	}
+
 	ret = percpu_counter_init(&tctx->inflight, 0, GFP_KERNEL);
 	if (unlikely(ret)) {
+		kfree(tctx->registered_rings);
 		kfree(tctx);
 		return ret;
 	}
@@ -8798,6 +8812,7 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task,
 	if (IS_ERR(tctx->io_wq)) {
 		ret = PTR_ERR(tctx->io_wq);
 		percpu_counter_destroy(&tctx->inflight);
+		kfree(tctx->registered_rings);
 		kfree(tctx);
 		return ret;
 	}
@@ -8822,6 +8837,7 @@ void __io_uring_free(struct task_struct *tsk)
 	WARN_ON_ONCE(tctx->io_wq);
 	WARN_ON_ONCE(tctx->cached_refs);
 
+	kfree(tctx->registered_rings);
 	percpu_counter_destroy(&tctx->inflight);
 	kfree(tctx);
 	tsk->io_uring = NULL;
@@ -10043,6 +10059,139 @@ void __io_uring_cancel(bool cancel_all)
 	io_uring_cancel_generic(cancel_all, NULL);
 }
 
+void io_uring_unreg_ringfd(void)
+{
+	struct io_uring_task *tctx = current->io_uring;
+	int i;
+
+	for (i = 0; i < IO_RINGFD_REG_MAX; i++) {
+		if (tctx->registered_rings[i]) {
+			fput(tctx->registered_rings[i]);
+			tctx->registered_rings[i] = NULL;
+		}
+	}
+}
+
+static int io_ring_add_registered_fd(struct io_uring_task *tctx, int fd,
+				     int start, int end)
+{
+	struct file *file;
+	int offset;
+
+	for (offset = start; offset < end; offset++) {
+		offset = array_index_nospec(offset, IO_RINGFD_REG_MAX);
+		if (tctx->registered_rings[offset])
+			continue;
+
+		file = fget(fd);
+		if (!file) {
+			return -EBADF;
+		} else if (file->f_op != &io_uring_fops) {
+			fput(file);
+			return -EOPNOTSUPP;
+		}
+		tctx->registered_rings[offset] = file;
+		return offset;
+	}
+
+	return -EBUSY;
+}
+
+/*
+ * Register a ring fd to avoid fdget/fdput for each io_uring_enter()
+ * invocation. User passes in an array of struct io_uring_rsrc_update
+ * with ->data set to the ring_fd, and ->offset given for the desired
+ * index. If no index is desired, application may set ->offset == -1U
+ * and we'll find an available index. Returns number of entries
+ * successfully processed, or < 0 on error if none were processed.
+ */
+static int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg,
+			      unsigned nr_args)
+{
+	struct io_uring_rsrc_update __user *arg = __arg;
+	struct io_uring_rsrc_update reg;
+	struct io_uring_task *tctx;
+	int ret, i;
+
+	if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
+		return -EINVAL;
+
+	mutex_unlock(&ctx->uring_lock);
+	ret = io_uring_add_tctx_node(ctx);
+	mutex_lock(&ctx->uring_lock);
+	if (ret)
+		return ret;
+
+	tctx = current->io_uring;
+	for (i = 0; i < nr_args; i++) {
+		int start, end;
+
+		if (copy_from_user(&reg, &arg[i], sizeof(reg))) {
+			ret = -EFAULT;
+			break;
+		}
+
+		if (reg.offset == -1U) {
+			start = 0;
+			end = IO_RINGFD_REG_MAX;
+		} else {
+			if (reg.offset >= IO_RINGFD_REG_MAX) {
+				ret = -EINVAL;
+				break;
+			}
+			start = reg.offset;
+			end = start + 1;
+		}
+
+		ret = io_ring_add_registered_fd(tctx, reg.data, start, end);
+		if (ret < 0)
+			break;
+
+		reg.offset = ret;
+		if (copy_to_user(&arg[i], &reg, sizeof(reg))) {
+			fput(tctx->registered_rings[reg.offset]);
+			tctx->registered_rings[reg.offset] = NULL;
+			ret = -EFAULT;
+			break;
+		}
+	}
+
+	return i ? i : ret;
+}
+
+static int io_ringfd_unregister(struct io_ring_ctx *ctx, void __user *__arg,
+				unsigned nr_args)
+{
+	struct io_uring_rsrc_update __user *arg = __arg;
+	struct io_uring_task *tctx = current->io_uring;
+	struct io_uring_rsrc_update reg;
+	int ret = 0, i;
+
+	if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
+		return -EINVAL;
+	if (!tctx)
+		return 0;
+
+	for (i = 0; i < nr_args; i++) {
+		if (copy_from_user(&reg, &arg[i], sizeof(reg))) {
+			ret = -EFAULT;
+			break;
+		}
+		if (reg.offset >= IO_RINGFD_REG_MAX) {
+			ret = -EINVAL;
+			break;
+		}
+
+		reg.offset = array_index_nospec(reg.offset, IO_RINGFD_REG_MAX);
+		if (tctx->registered_rings[reg.offset]) {
+			fput(tctx->registered_rings[reg.offset]);
+			tctx->registered_rings[reg.offset] = NULL;
+		}
+	}
+
+	return i ? i : ret;
+}
+
 static void *io_uring_validate_mmap_request(struct file *file,
 					    loff_t pgoff, size_t sz)
 {
@@ -10173,12 +10322,28 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 	io_run_task_work();
 
 	if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
-			       IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG)))
+			       IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
+			       IORING_ENTER_REGISTERED_RING)))
 		return -EINVAL;
 
-	f = fdget(fd);
-	if (unlikely(!f.file))
-		return -EBADF;
+	/*
+	 * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
+	 * need only dereference our task private array to find it.
+	 */
+	if (flags & IORING_ENTER_REGISTERED_RING) {
+		struct io_uring_task *tctx = current->io_uring;
+
+		if (!tctx || fd >= IO_RINGFD_REG_MAX)
+			return -EINVAL;
+		fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
+		f.file = tctx->registered_rings[fd];
+		if (unlikely(!f.file))
+			return -EBADF;
+	} else {
+		f = fdget(fd);
+		if (unlikely(!f.file))
+			return -EBADF;
+	}
 
 	ret = -EOPNOTSUPP;
 	if (unlikely(f.file->f_op != &io_uring_fops))
@@ -10252,7 +10417,8 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 out:
 	percpu_ref_put(&ctx->refs);
 out_fput:
-	fdput(f);
+	if (!(flags & IORING_ENTER_REGISTERED_RING))
+		fdput(f);
 	return submitted ? submitted : ret;
 }
 
@@ -11142,6 +11308,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 			break;
 		ret = io_register_iowq_max_workers(ctx, arg);
 		break;
+	case IORING_REGISTER_RING_FDS:
+		ret = io_ringfd_register(ctx, arg, nr_args);
+		break;
+	case IORING_UNREGISTER_RING_FDS:
+		ret = io_ringfd_unregister(ctx, arg, nr_args);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
diff --git a/include/linux/io_uring.h b/include/linux/io_uring.h
index 649a4d7c241b..1814e698d861 100644
--- a/include/linux/io_uring.h
+++ b/include/linux/io_uring.h
@@ -9,11 +9,14 @@
 struct sock *io_uring_get_socket(struct file *file);
 void __io_uring_cancel(bool cancel_all);
 void __io_uring_free(struct task_struct *tsk);
+void io_uring_unreg_ringfd(void);
 
 static inline void io_uring_files_cancel(void)
 {
-	if (current->io_uring)
+	if (current->io_uring) {
+		io_uring_unreg_ringfd();
 		__io_uring_cancel(false);
+	}
 }
 static inline void io_uring_task_cancel(void)
 {
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 787f491f0d2a..42b2fe84dbcd 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -257,10 +257,11 @@ struct io_cqring_offsets {
 /*
  * io_uring_enter(2) flags
  */
-#define IORING_ENTER_GETEVENTS	(1U << 0)
-#define IORING_ENTER_SQ_WAKEUP	(1U << 1)
-#define IORING_ENTER_SQ_WAIT	(1U << 2)
-#define IORING_ENTER_EXT_ARG	(1U << 3)
+#define IORING_ENTER_GETEVENTS		(1U << 0)
+#define IORING_ENTER_SQ_WAKEUP		(1U << 1)
+#define IORING_ENTER_SQ_WAIT		(1U << 2)
+#define IORING_ENTER_EXT_ARG		(1U << 3)
+#define IORING_ENTER_REGISTERED_RING	(1U << 4)
 
 /*
  * Passed in for io_uring_setup(2). Copied back with updated info on success
@@ -325,6 +326,10 @@ enum {
 	/* set/get max number of io-wq workers */
 	IORING_REGISTER_IOWQ_MAX_WORKERS	= 19,
 
+	/* register/unregister io_uring fd with the ring */
+	IORING_REGISTER_RING_FDS		= 20,
+	IORING_UNREGISTER_RING_FDS		= 21,
+
 	/* this goes last */
 	IORING_REGISTER_LAST
 };
-- 
2.35.3