From: Trond Myklebust <trond.myklebust@hammerspace.com>
Date: Tue, 25 May 2021 18:43:38 -0400
Subject: [PATCH] SUNRPC: More fixes for backlog congestion
Git-commit: e86be3a04bc4aeaf12f93af35f08f8d4385bcd98
Patch-mainline: v5.13
References: bsc#1185428
Ensure that we fix the XPRT_CONGESTED starvation issue for RDMA as well
as socket based transports.
Ensure we always initialise the request after waking up from the backlog
list.
Fixes: e877a88d1f06 ("SUNRPC in case of backlog, hand free slots directly to waiting task")
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
Acked-by: NeilBrown <neilb@suse.com>
---
include/linux/sunrpc/xprt.h | 2 +
net/sunrpc/xprt.c | 74 +++++++++++++++++++---------------------
net/sunrpc/xprtrdma/transport.c | 10 +++--
net/sunrpc/xprtrdma/verbs.c | 24 ++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 1
5 files changed, 69 insertions(+), 42 deletions(-)
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -344,6 +344,8 @@ struct rpc_xprt * xprt_alloc(struct net
unsigned int num_prealloc,
unsigned int max_req);
void xprt_free(struct rpc_xprt *);
+void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task);
+bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req);
static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p)
{
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1086,11 +1086,18 @@ void xprt_transmit(struct rpc_task *task
}
}
-static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
+static void xprt_complete_request_init(struct rpc_task *task)
+{
+ if (task->tk_rqstp)
+ xprt_request_init(task);
+}
+
+void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
{
set_bit(XPRT_CONGESTED, &xprt->state);
- rpc_sleep_on(&xprt->backlog, task, NULL);
+ rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
}
+EXPORT_SYMBOL_GPL(xprt_add_backlog);
static bool __xprt_set_rq(struct rpc_task *task, void *data)
{
@@ -1098,14 +1105,13 @@ static bool __xprt_set_rq(struct rpc_tas
if (task->tk_rqstp == NULL) {
memset(req, 0, sizeof(*req)); /* mark unused */
- task->tk_status = -EAGAIN;
task->tk_rqstp = req;
return true;
}
return false;
}
-static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
+bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
clear_bit(XPRT_CONGESTED, &xprt->state);
@@ -1113,6 +1119,7 @@ static bool xprt_wake_up_backlog(struct
}
return true;
}
+EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
{
@@ -1122,7 +1129,7 @@ static bool xprt_throttle_congested(stru
goto out;
spin_lock(&xprt->reserve_lock);
if (test_bit(XPRT_CONGESTED, &xprt->state)) {
- rpc_sleep_on(&xprt->backlog, task, NULL);
+ xprt_add_backlog(xprt, task);
ret = true;
}
spin_unlock(&xprt->reserve_lock);
@@ -1298,10 +1305,6 @@ xprt_request_init(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req = task->tk_rqstp;
- if (req->rq_task)
- /* Already initialized */
- return;
-
INIT_LIST_HEAD(&req->rq_list);
req->rq_timeout = task->tk_client->cl_timeout->to_initval;
req->rq_task = task;
@@ -1364,10 +1367,8 @@ void xprt_retry_reserve(struct rpc_task
struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0;
- if (task->tk_rqstp != NULL) {
- xprt_request_init(task);
+ if (task->tk_rqstp != NULL)
return;
- }
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
@@ -1394,32 +1395,29 @@ void xprt_release(struct rpc_task *task)
}
xprt = req->rq_xprt;
- if (xprt) {
- if (task->tk_ops->rpc_count_stats != NULL)
- task->tk_ops->rpc_count_stats(task, task->tk_calldata);
- else if (task->tk_client)
- rpc_count_iostats(task, task->tk_client->cl_metrics);
- spin_lock(&xprt->recv_lock);
- if (!list_empty(&req->rq_list)) {
- list_del_init(&req->rq_list);
- xprt_wait_on_pinned_rqst(req);
- }
- spin_unlock(&xprt->recv_lock);
- spin_lock_bh(&xprt->transport_lock);
- xprt->ops->release_xprt(xprt, task);
- if (xprt->ops->release_request)
- xprt->ops->release_request(task);
- xprt->last_used = jiffies;
- xprt_schedule_autodisconnect(xprt);
- spin_unlock_bh(&xprt->transport_lock);
- if (req->rq_buffer)
- xprt->ops->buf_free(task);
- if (req->rq_cred != NULL)
- put_rpccred(req->rq_cred);
- if (req->rq_release_snd_buf)
- req->rq_release_snd_buf(req);
- } else
- xprt = task->tk_xprt;
+ if (task->tk_ops->rpc_count_stats != NULL)
+ task->tk_ops->rpc_count_stats(task, task->tk_calldata);
+ else if (task->tk_client)
+ rpc_count_iostats(task, task->tk_client->cl_metrics);
+ spin_lock(&xprt->recv_lock);
+ if (!list_empty(&req->rq_list)) {
+ list_del_init(&req->rq_list);
+ xprt_wait_on_pinned_rqst(req);
+ }
+ spin_unlock(&xprt->recv_lock);
+ spin_lock_bh(&xprt->transport_lock);
+ xprt->ops->release_xprt(xprt, task);
+ if (xprt->ops->release_request)
+ xprt->ops->release_request(task);
+ xprt->last_used = jiffies;
+ xprt_schedule_autodisconnect(xprt);
+ spin_unlock_bh(&xprt->transport_lock);
+ if (req->rq_buffer)
+ xprt->ops->buf_free(task);
+ if (req->rq_cred != NULL)
+ put_rpccred(req->rq_cred);
+ if (req->rq_release_snd_buf)
+ req->rq_release_snd_buf(req);
task->tk_rqstp = NULL;
dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -571,8 +571,8 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xp
return;
out_sleep:
- rpc_sleep_on(&xprt->backlog, task, NULL);
task->tk_status = -EAGAIN;
+ xprt_add_backlog(xprt, task);
}
/**
@@ -584,9 +584,11 @@ out_sleep:
static void
xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
{
- memset(rqst, 0, sizeof(*rqst));
- rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
- rpc_wake_up_next(&xprt->backlog);
+ rpcrdma_reply_put(rpcr_to_rdmar(rqst));
+ if (!xprt_wake_up_backlog(xprt, rqst)) {
+ memset(rqst, 0, sizeof(*rqst));
+ rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
+ }
}
static bool
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1345,6 +1345,30 @@ rpcrdma_buffer_get(struct rpcrdma_buffer
}
/**
+ * rpcrdma_reply_put - Put reply buffers back into pool
+ * @req: object to return
+ *
+ */
+void rpcrdma_reply_put(struct rpcrdma_req *req)
+{
+ struct rpcrdma_buffer *buffers = req->rl_buffer;
+ struct rpcrdma_rep *rep = req->rl_reply;
+
+ if (!rep)
+ return;
+
+ req->rl_reply = NULL;
+
+ if (rep->rr_temp)
+ rpcrdma_destroy_rep(rep);
+ else {
+ spin_lock(&buffers->rb_lock);
+ list_add(&rep->rr_list, &buffers->rb_recv_bufs);
+ spin_unlock(&buffers->rb_lock);
+ }
+}
+
+/**
* rpcrdma_buffer_put - Put request/reply buffers back into pool
* @req: object to return
*
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -583,6 +583,7 @@ void rpcrdma_mr_defer_recovery(struct rp
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_req *);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
+void rpcrdma_reply_put(struct rpcrdma_req *req);
struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction,
gfp_t);