Blob Blame History Raw
From: Trond Myklebust <trond.myklebust@hammerspace.com>
Date: Tue, 25 May 2021 18:43:38 -0400
Subject: [PATCH] SUNRPC: More fixes for backlog congestion
Git-commit: e86be3a04bc4aeaf12f93af35f08f8d4385bcd98
Patch-mainline: v5.13
References: bsc#1185428

Ensure that we fix the XPRT_CONGESTED starvation issue for RDMA as well
as socket based transports.
Ensure we always initialise the request after waking up from the backlog
list.

Fixes: e877a88d1f06 ("SUNRPC in case of backlog, hand free slots directly to waiting task")
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
Acked-by: NeilBrown <neilb@suse.com>

---
 include/linux/sunrpc/xprt.h     |    2 +
 net/sunrpc/xprt.c               |   76 +++++++++++++++++++---------------------
 net/sunrpc/xprtrdma/transport.c |   10 +++--
 net/sunrpc/xprtrdma/verbs.c     |   24 ++++++++++++
 net/sunrpc/xprtrdma/xprt_rdma.h |    1 
 5 files changed, 70 insertions(+), 43 deletions(-)

--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -349,6 +349,8 @@ struct rpc_xprt *	xprt_alloc(struct net
 				unsigned int num_prealloc,
 				unsigned int max_req);
 void			xprt_free(struct rpc_xprt *);
+void			xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task);
+bool			xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req);
 
 static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p)
 {
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1083,11 +1083,18 @@ void xprt_transmit(struct rpc_task *task
 	}
 }
 
-static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
+static void xprt_complete_request_init(struct rpc_task *task)
+{
+	if (task->tk_rqstp)
+		xprt_request_init(task);
+}
+
+void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
 {
 	set_bit(XPRT_CONGESTED, &xprt->state);
-	rpc_sleep_on(&xprt->backlog, task, NULL);
+	rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
 }
+EXPORT_SYMBOL_GPL(xprt_add_backlog);
 
 static bool __xprt_set_rq(struct rpc_task *task, void *data)
 {
@@ -1095,14 +1102,13 @@ static bool __xprt_set_rq(struct rpc_tas
 
 	if (task->tk_rqstp == NULL) {
 		memset(req, 0, sizeof(*req));	/* mark unused */
-		task->tk_status = -EAGAIN;
 		task->tk_rqstp = req;
 		return true;
 	}
 	return false;
 }
 
-static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
+bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
 	if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
 		clear_bit(XPRT_CONGESTED, &xprt->state);
@@ -1110,6 +1116,7 @@ static bool xprt_wake_up_backlog(struct
 	}
 	return true;
 }
+EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
 
 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
 {
@@ -1119,7 +1126,7 @@ static bool xprt_throttle_congested(stru
 		goto out;
 	spin_lock(&xprt->reserve_lock);
 	if (test_bit(XPRT_CONGESTED, &xprt->state)) {
-		rpc_sleep_on(&xprt->backlog, task, NULL);
+		xprt_add_backlog(xprt, task);
 		ret = true;
 	}
 	spin_unlock(&xprt->reserve_lock);
@@ -1289,10 +1296,6 @@ xprt_request_init(struct rpc_task *task)
 	struct rpc_xprt *xprt = task->tk_xprt;
 	struct rpc_rqst	*req = task->tk_rqstp;
 
-	if (req->rq_task)
-		/* Already initialized */
-		return;
-
 	INIT_LIST_HEAD(&req->rq_list);
 	req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 	req->rq_task	= task;
@@ -1355,10 +1358,8 @@ void xprt_retry_reserve(struct rpc_task
 	struct rpc_xprt *xprt = task->tk_xprt;
 
 	task->tk_status = 0;
-	if (task->tk_rqstp != NULL) {
-		xprt_request_init(task);
+	if (task->tk_rqstp != NULL)
 		return;
-	}
 
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
@@ -1385,33 +1386,30 @@ void xprt_release(struct rpc_task *task)
 	}
 
 	xprt = req->rq_xprt;
-	if (xprt) {
-		if (task->tk_ops->rpc_count_stats != NULL)
-			task->tk_ops->rpc_count_stats(task, task->tk_calldata);
-		else if (task->tk_client)
-			rpc_count_iostats(task, task->tk_client->cl_metrics);
-		spin_lock(&xprt->recv_lock);
-		if (!list_empty(&req->rq_list)) {
-			list_del_init(&req->rq_list);
-			xprt_wait_on_pinned_rqst(req);
-		}
-		spin_unlock(&xprt->recv_lock);
-		spin_lock_bh(&xprt->transport_lock);
-		xprt->ops->release_xprt(xprt, task);
-		if (xprt->ops->release_request)
-			xprt->ops->release_request(task);
-		xprt->last_used = jiffies;
-		xprt_schedule_autodisconnect(xprt);
-		spin_unlock_bh(&xprt->transport_lock);
-		if (req->rq_buffer)
-			xprt->ops->buf_free(task);
-		xprt_inject_disconnect(xprt);
-		if (req->rq_cred != NULL)
-			put_rpccred(req->rq_cred);
-		if (req->rq_release_snd_buf)
-			req->rq_release_snd_buf(req);
-	} else
-		xprt = task->tk_xprt;
+	if (task->tk_ops->rpc_count_stats != NULL)
+		task->tk_ops->rpc_count_stats(task, task->tk_calldata);
+	else if (task->tk_client)
+		rpc_count_iostats(task, task->tk_client->cl_metrics);
+	spin_lock(&xprt->recv_lock);
+	if (!list_empty(&req->rq_list)) {
+		list_del_init(&req->rq_list);
+		xprt_wait_on_pinned_rqst(req);
+	}
+	spin_unlock(&xprt->recv_lock);
+	spin_lock_bh(&xprt->transport_lock);
+	xprt->ops->release_xprt(xprt, task);
+	if (xprt->ops->release_request)
+		xprt->ops->release_request(task);
+	xprt->last_used = jiffies;
+	xprt_schedule_autodisconnect(xprt);
+	spin_unlock_bh(&xprt->transport_lock);
+	if (req->rq_buffer)
+		xprt->ops->buf_free(task);
+	xprt_inject_disconnect(xprt);
+	if (req->rq_cred != NULL)
+		put_rpccred(req->rq_cred);
+	if (req->rq_release_snd_buf)
+		req->rq_release_snd_buf(req);
 
 	task->tk_rqstp = NULL;
 	dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -571,8 +571,8 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xp
 	return;
 
 out_sleep:
-	rpc_sleep_on(&xprt->backlog, task, NULL);
 	task->tk_status = -EAGAIN;
+	xprt_add_backlog(xprt, task);
 }
 
 /**
@@ -584,9 +584,11 @@ out_sleep:
 static void
 xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
 {
-	memset(rqst, 0, sizeof(*rqst));
-	rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
-	rpc_wake_up_next(&xprt->backlog);
+	rpcrdma_reply_put(rpcr_to_rdmar(rqst));
+	if (!xprt_wake_up_backlog(xprt, rqst)) {
+		memset(rqst, 0, sizeof(*rqst));
+		rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
+	}
 }
 
 static bool
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1345,6 +1345,30 @@ rpcrdma_buffer_get(struct rpcrdma_buffer
 }
 
 /**
+ * rpcrdma_reply_put - Put reply buffers back into pool
+ * @req: object to return
+ *
+ */
+void rpcrdma_reply_put(struct rpcrdma_req *req)
+{
+	struct rpcrdma_buffer *buffers = req->rl_buffer;
+	struct rpcrdma_rep *rep = req->rl_reply;
+
+	if (!rep)
+		return;
+
+	req->rl_reply = NULL;
+
+	if (rep->rr_temp)
+		rpcrdma_destroy_rep(rep);
+	else {
+		spin_lock(&buffers->rb_lock);
+		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
+		spin_unlock(&buffers->rb_lock);
+	}
+}
+
+/**
  * rpcrdma_buffer_put - Put request/reply buffers back into pool
  * @req: object to return
  *
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -583,6 +583,7 @@ void rpcrdma_mr_defer_recovery(struct rp
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
 void rpcrdma_buffer_put(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
+void rpcrdma_reply_put(struct rpcrdma_req *req);
 
 struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction,
 					    gfp_t);