Blob Blame History Raw
From: Chuck Lever <chuck.lever@oracle.com>
Date: Fri, 4 May 2018 15:35:20 -0400
Subject: xprtrdma: Move Receive posting to Receive handler
Patch-mainline: v4.18-rc1
Git-commit: 7c8d9e7c8863905951d4eaa7a8d277150f3a37f7
References: bsc#1103992 FATE#326009

Receive completion and Reply handling are done by a BOUND
workqueue, meaning they run on only one CPU.

Posting receives is currently done in the send_request path, which
on large systems is typically done on a different CPU than the one
handling Receive completions. This results in movement of
Receive-related cachelines between the sending and receiving CPUs.

More importantly, it means that currently Receives are posted while
the transport's write lock is held, which is unnecessary and costly.

Finally, allocation of Receive buffers is performed on-demand in
the Receive completion handler. This helps guarantee that they are
allocated on the same NUMA node as the CPU that handles Receive
completions.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Acked-by: Thomas Bogendoerfer <tbogendoerfer@suse.de>
---
 include/trace/events/rpcrdma.h    |   40 +++++++-
 net/sunrpc/xprtrdma/backchannel.c |   32 +-----
 net/sunrpc/xprtrdma/rpc_rdma.c    |   22 +---
 net/sunrpc/xprtrdma/transport.c   |    3 
 net/sunrpc/xprtrdma/verbs.c       |  176 +++++++++++++++++++++-----------------
 net/sunrpc/xprtrdma/xprt_rdma.h   |    6 -
 6 files changed, 150 insertions(+), 129 deletions(-)

--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -548,6 +548,39 @@ TRACE_EVENT(xprtrdma_post_recv,
 	)
 );
 
+TRACE_EVENT(xprtrdma_post_recvs,
+	TP_PROTO(
+		const struct rpcrdma_xprt *r_xprt,
+		unsigned int count,
+		int status
+	),
+
+	TP_ARGS(r_xprt, count, status),
+
+	TP_STRUCT__entry(
+		__field(const void *, r_xprt)
+		__field(unsigned int, count)
+		__field(int, status)
+		__field(int, posted)
+		__string(addr, rpcrdma_addrstr(r_xprt))
+		__string(port, rpcrdma_portstr(r_xprt))
+	),
+
+	TP_fast_assign(
+		__entry->r_xprt = r_xprt;
+		__entry->count = count;
+		__entry->status = status;
+		__entry->posted = r_xprt->rx_buf.rb_posted_receives;
+		__assign_str(addr, rpcrdma_addrstr(r_xprt));
+		__assign_str(port, rpcrdma_portstr(r_xprt));
+	),
+
+	TP_printk("peer=[%s]:%s r_xprt=%p: %u new recvs, %d active (rc %d)",
+		__get_str(addr), __get_str(port), __entry->r_xprt,
+		__entry->count, __entry->posted, __entry->status
+	)
+);
+
 /**
  ** Completion events
  **/
@@ -802,7 +835,6 @@ TRACE_EVENT(xprtrdma_allocate,
 		__field(unsigned int, task_id)
 		__field(unsigned int, client_id)
 		__field(const void *, req)
-		__field(const void *, rep)
 		__field(size_t, callsize)
 		__field(size_t, rcvsize)
 	),
@@ -811,15 +843,13 @@ TRACE_EVENT(xprtrdma_allocate,
 		__entry->task_id = task->tk_pid;
 		__entry->client_id = task->tk_client->cl_clid;
 		__entry->req = req;
-		__entry->rep = req ? req->rl_reply : NULL;
 		__entry->callsize = task->tk_rqstp->rq_callsize;
 		__entry->rcvsize = task->tk_rqstp->rq_rcvsize;
 	),
 
-	TP_printk("task:%u@%u req=%p rep=%p (%zu, %zu)",
+	TP_printk("task:%u@%u req=%p (%zu, %zu)",
 		__entry->task_id, __entry->client_id,
-		__entry->req, __entry->rep,
-		__entry->callsize, __entry->rcvsize
+		__entry->req, __entry->callsize, __entry->rcvsize
 	)
 );
 
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -72,23 +72,6 @@ out_fail:
 	return -ENOMEM;
 }
 
-/* Allocate and add receive buffers to the rpcrdma_buffer's
- * existing list of rep's. These are released when the
- * transport is destroyed.
- */
-static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
-				 unsigned int count)
-{
-	int rc = 0;
-
-	while (count--) {
-		rc = rpcrdma_create_rep(r_xprt);
-		if (rc)
-			break;
-	}
-	return rc;
-}
-
 /**
  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
  * @xprt: transport associated with these backchannel resources
@@ -117,14 +100,6 @@ int xprt_rdma_bc_setup(struct rpc_xprt *
 	if (rc)
 		goto out_free;
 
-	rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
-	if (rc)
-		goto out_free;
-
-	rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
-	if (rc)
-		goto out_free;
-
 	r_xprt->rx_buf.rb_bc_srv_max_requests = reqs;
 	request_module("svcrdma");
 	trace_xprtrdma_cb_setup(r_xprt, reqs);
@@ -229,6 +204,7 @@ int xprt_rdma_bc_send_reply(struct rpc_r
 	if (rc < 0)
 		goto failed_marshal;
 
+	rpcrdma_post_recvs(r_xprt, true);
 	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 		goto drop_connection;
 	return 0;
@@ -269,10 +245,14 @@ void xprt_rdma_bc_destroy(struct rpc_xpr
  */
 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
 {
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 
 	dprintk("RPC:       %s: freeing rqst %p (req %p)\n",
-		__func__, rqst, rpcr_to_rdmar(rqst));
+		__func__, rqst, req);
+
+	rpcrdma_recv_buffer_put(req->rl_reply);
+	req->rl_reply = NULL;
 
 	spin_lock_bh(&xprt->bc_pa_lock);
 	list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1030,8 +1030,6 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_
 
 out_short:
 	pr_warn("RPC/RDMA short backward direction call\n");
-	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
-		xprt_disconnect_done(&r_xprt->rx_xprt);
 	return true;
 }
 #else	/* CONFIG_SUNRPC_BACKCHANNEL */
@@ -1337,13 +1335,14 @@ void rpcrdma_reply_handler(struct rpcrdm
 	u32 credits;
 	__be32 *p;
 
+	--buf->rb_posted_receives;
+
 	if (rep->rr_hdrbuf.head[0].iov_len == 0)
 		goto out_badstatus;
 
+	/* Fixed transport header fields */
 	xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
 			rep->rr_hdrbuf.head[0].iov_base);
-
-	/* Fixed transport header fields */
 	p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
 	if (unlikely(!p))
 		goto out_shortreply;
@@ -1382,17 +1381,10 @@ void rpcrdma_reply_handler(struct rpcrdm
 
 	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
 
+	rpcrdma_post_recvs(r_xprt, false);
 	queue_work(rpcrdma_receive_wq, &rep->rr_work);
 	return;
 
-out_badstatus:
-	rpcrdma_recv_buffer_put(rep);
-	if (r_xprt->rx_ep.rep_connected == 1) {
-		r_xprt->rx_ep.rep_connected = -EIO;
-		rpcrdma_conn_func(&r_xprt->rx_ep);
-	}
-	return;
-
 out_badversion:
 	trace_xprtrdma_reply_vers(rep);
 	goto repost;
@@ -1412,7 +1404,7 @@ out_shortreply:
  * receive buffer before returning.
  */
 repost:
-	r_xprt->rx_stats.bad_reply_count++;
-	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
-		rpcrdma_recv_buffer_put(rep);
+	rpcrdma_post_recvs(r_xprt, false);
+out_badstatus:
+	rpcrdma_recv_buffer_put(rep);
 }
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -728,9 +728,6 @@ xprt_rdma_send_request(struct rpc_task *
 	if (rc < 0)
 		goto failed_marshal;
 
-	if (req->rl_reply == NULL) 		/* e.g. reconnection */
-		rpcrdma_recv_buffer_get(req);
-
 	/* Must suppress retransmit to maintain credits */
 	if (rqst->rq_connect_cookie == xprt->connect_cookie)
 		goto drop_connection;
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -75,6 +75,7 @@
  */
 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
+static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
 
 struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
@@ -727,7 +728,6 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep
 {
 	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 						   rx_ia);
-	unsigned int extras;
 	int rc;
 
 retry:
@@ -771,9 +771,8 @@ retry:
 	}
 
 	dprintk("RPC:       %s: connected\n", __func__);
-	extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
-	if (extras)
-		rpcrdma_ep_post_extra_recv(r_xprt, extras);
+
+	rpcrdma_post_recvs(r_xprt, true);
 
 out:
 	if (rc)
@@ -1083,14 +1082,8 @@ rpcrdma_create_req(struct rpcrdma_xprt *
 	return req;
 }
 
-/**
- * rpcrdma_create_rep - Allocate an rpcrdma_rep object
- * @r_xprt: controlling transport
- *
- * Returns 0 on success or a negative errno on failure.
- */
-int
-rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
+static int
+rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
 {
 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
@@ -1118,6 +1111,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *
 	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
 	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 	rep->rr_recv_wr.num_sge = 1;
+	rep->rr_temp = temp;
 
 	spin_lock(&buf->rb_lock);
 	list_add(&rep->rr_list, &buf->rb_recv_bufs);
@@ -1169,12 +1163,8 @@ rpcrdma_buffer_create(struct rpcrdma_xpr
 		list_add(&req->rl_list, &buf->rb_send_bufs);
 	}
 
+	buf->rb_posted_receives = 0;
 	INIT_LIST_HEAD(&buf->rb_recv_bufs);
-	for (i = 0; i <= buf->rb_max_requests; i++) {
-		rc = rpcrdma_create_rep(r_xprt);
-		if (rc)
-			goto out;
-	}
 
 	rc = rpcrdma_sendctxs_create(r_xprt);
 	if (rc)
@@ -1264,7 +1254,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_bu
 		rep = rpcrdma_buffer_get_rep_locked(buf);
 		rpcrdma_destroy_rep(rep);
 	}
-	buf->rb_send_count = 0;
 
 	spin_lock(&buf->rb_reqslock);
 	while (!list_empty(&buf->rb_allreqs)) {
@@ -1279,7 +1268,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_bu
 		spin_lock(&buf->rb_reqslock);
 	}
 	spin_unlock(&buf->rb_reqslock);
-	buf->rb_recv_count = 0;
 
 	rpcrdma_mrs_destroy(buf);
 }
@@ -1352,27 +1340,11 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_
 	__rpcrdma_mr_put(&r_xprt->rx_buf, mr);
 }
 
-static struct rpcrdma_rep *
-rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers)
-{
-	/* If an RPC previously completed without a reply (say, a
-	 * credential problem or a soft timeout occurs) then hold off
-	 * on supplying more Receive buffers until the number of new
-	 * pending RPCs catches up to the number of posted Receives.
-	 */
-	if (unlikely(buffers->rb_send_count < buffers->rb_recv_count))
-		return NULL;
-
-	if (unlikely(list_empty(&buffers->rb_recv_bufs)))
-		return NULL;
-	buffers->rb_recv_count++;
-	return rpcrdma_buffer_get_rep_locked(buffers);
-}
-
-/*
- * Get a set of request/reply buffers.
+/**
+ * rpcrdma_buffer_get - Get a request buffer
+ * @buffers: Buffer pool from which to obtain a buffer
  *
- * Reply buffer (if available) is attached to send buffer upon return.
+ * Returns a fresh rpcrdma_req, or NULL if none are available.
  */
 struct rpcrdma_req *
 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
@@ -1380,23 +1352,21 @@ rpcrdma_buffer_get(struct rpcrdma_buffer
 	struct rpcrdma_req *req;
 
 	spin_lock(&buffers->rb_lock);
-	if (list_empty(&buffers->rb_send_bufs))
-		goto out_reqbuf;
-	buffers->rb_send_count++;
+	if (unlikely(list_empty(&buffers->rb_send_bufs)))
+		goto out_noreqs;
 	req = rpcrdma_buffer_get_req_locked(buffers);
-	req->rl_reply = rpcrdma_buffer_get_rep(buffers);
 	spin_unlock(&buffers->rb_lock);
-
 	return req;
 
-out_reqbuf:
+out_noreqs:
 	spin_unlock(&buffers->rb_lock);
 	return NULL;
 }
 
-/*
- * Put request/reply buffers back into pool.
- * Pre-decrement counter/array index.
+/**
+ * rpcrdma_buffer_put - Put request/reply buffers back into pool
+ * @req: object to return
+ *
  */
 void
 rpcrdma_buffer_put(struct rpcrdma_req *req)
@@ -1407,27 +1377,16 @@ rpcrdma_buffer_put(struct rpcrdma_req *r
 	req->rl_reply = NULL;
 
 	spin_lock(&buffers->rb_lock);
-	buffers->rb_send_count--;
-	list_add_tail(&req->rl_list, &buffers->rb_send_bufs);
+	list_add(&req->rl_list, &buffers->rb_send_bufs);
 	if (rep) {
-		buffers->rb_recv_count--;
-		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+		if (!rep->rr_temp) {
+			list_add(&rep->rr_list, &buffers->rb_recv_bufs);
+			rep = NULL;
+		}
 	}
 	spin_unlock(&buffers->rb_lock);
-}
-
-/*
- * Recover reply buffers from pool.
- * This happens when recovering from disconnect.
- */
-void
-rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
-{
-	struct rpcrdma_buffer *buffers = req->rl_buffer;
-
-	spin_lock(&buffers->rb_lock);
-	req->rl_reply = rpcrdma_buffer_get_rep(buffers);
-	spin_unlock(&buffers->rb_lock);
+	if (rep)
+		rpcrdma_destroy_rep(rep);
 }
 
 /*
@@ -1439,10 +1398,13 @@ rpcrdma_recv_buffer_put(struct rpcrdma_r
 {
 	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
 
-	spin_lock(&buffers->rb_lock);
-	buffers->rb_recv_count--;
-	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
-	spin_unlock(&buffers->rb_lock);
+	if (!rep->rr_temp) {
+		spin_lock(&buffers->rb_lock);
+		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
+		spin_unlock(&buffers->rb_lock);
+	} else {
+		rpcrdma_destroy_rep(rep);
+	}
 }
 
 /**
@@ -1538,13 +1500,6 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
 	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
 	int rc;
 
-	if (req->rl_reply) {
-		rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
-		if (rc)
-			return rc;
-		req->rl_reply = NULL;
-	}
-
 	if (!ep->rep_send_count ||
 	    test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
 		send_wr->send_flags |= IB_SEND_SIGNALED;
@@ -1619,3 +1574,70 @@ out_rc:
 	rpcrdma_recv_buffer_put(rep);
 	return rc;
 }
+
+/**
+ * rpcrdma_post_recvs - Maybe post some Receive buffers
+ * @r_xprt: controlling transport
+ * @temp: when true, allocate temp rpcrdma_rep objects
+ *
+ */
+void
+rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
+{
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct ib_recv_wr *wr, *bad_wr;
+	int needed, count, rc;
+
+	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
+	if (buf->rb_posted_receives > needed)
+		return;
+	needed -= buf->rb_posted_receives;
+
+	count = 0;
+	wr = NULL;
+	while (needed) {
+		struct rpcrdma_regbuf *rb;
+		struct rpcrdma_rep *rep;
+
+		spin_lock(&buf->rb_lock);
+		rep = list_first_entry_or_null(&buf->rb_recv_bufs,
+					       struct rpcrdma_rep, rr_list);
+		if (likely(rep))
+			list_del(&rep->rr_list);
+		spin_unlock(&buf->rb_lock);
+		if (!rep) {
+			if (rpcrdma_create_rep(r_xprt, temp))
+				break;
+			continue;
+		}
+
+		rb = rep->rr_rdmabuf;
+		if (!rpcrdma_regbuf_is_mapped(rb)) {
+			if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) {
+				rpcrdma_recv_buffer_put(rep);
+				break;
+			}
+		}
+
+		trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
+		rep->rr_recv_wr.next = wr;
+		wr = &rep->rr_recv_wr;
+		++count;
+		--needed;
+	}
+	if (!count)
+		return;
+
+	rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, &bad_wr);
+	if (rc) {
+		for (wr = bad_wr; wr; wr = wr->next) {
+			struct rpcrdma_rep *rep;
+
+			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
+			rpcrdma_recv_buffer_put(rep);
+			--count;
+		}
+	}
+	buf->rb_posted_receives += count;
+	trace_xprtrdma_post_recvs(r_xprt, count, rc);
+}
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -197,6 +197,7 @@ struct rpcrdma_rep {
 	__be32			rr_proc;
 	int			rr_wc_flags;
 	u32			rr_inv_rkey;
+	bool			rr_temp;
 	struct rpcrdma_regbuf	*rr_rdmabuf;
 	struct rpcrdma_xprt	*rr_rxprt;
 	struct work_struct	rr_work;
@@ -397,11 +398,11 @@ struct rpcrdma_buffer {
 	struct rpcrdma_sendctx	**rb_sc_ctxs;
 
 	spinlock_t		rb_lock;	/* protect buf lists */
-	int			rb_send_count, rb_recv_count;
 	struct list_head	rb_send_bufs;
 	struct list_head	rb_recv_bufs;
 	u32			rb_max_requests;
 	u32			rb_credits;	/* most recent credit grant */
+	int			rb_posted_receives;
 
 	u32			rb_bc_srv_max_requests;
 	spinlock_t		rb_reqslock;	/* protect rb_allreqs */
@@ -558,13 +559,13 @@ void rpcrdma_ep_disconnect(struct rpcrdm
 int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
 				struct rpcrdma_req *);
 int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_rep *);
+void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
 /*
  * Buffer calls - xprtrdma/verbs.c
  */
 struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
 void rpcrdma_destroy_req(struct rpcrdma_req *);
-int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf);
@@ -577,7 +578,6 @@ void rpcrdma_mr_defer_recovery(struct rp
 
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
 void rpcrdma_buffer_put(struct rpcrdma_req *);
-void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
 
 struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction,