Blob Blame History Raw
From: Chuck Lever <chuck.lever@oracle.com>
Date: Fri, 20 Oct 2017 10:48:12 -0400
Subject: xprtrdma: Add data structure to manage RDMA Send arguments
Patch-mainline: v4.15-rc1
Git-commit: ae72950abf99fb250aca972b3451b6e06a096c68
References: bsc#1103992 FATE#326009

Problem statement:

Recently Sagi Grimberg <sagi@grimberg.me> observed that kernel RDMA-
enabled storage initiators don't handle delayed Send completion
correctly. If Send completion is delayed beyond the end of a ULP
transaction, the ULP may release resources that are still being used
by the HCA to complete a long-running Send operation.

This is a common design trait amongst our initiators. Most Send
operations are faster than the ULP transaction they are part of.
Waiting for a completion for these is typically unnecessary.

Infrequently, a network partition or some other problem crops up
where an ordering problem can occur. In NFS parlance, the RPC Reply
arrives and completes the RPC, but the HCA is still retrying the
Send WR that conveyed the RPC Call. In this case, the HCA can try
to use memory that has been invalidated or DMA unmapped, and the
connection is lost. If that memory has been re-used for something
else (possibly not related to NFS), and the Send retransmission
exposes that data on the wire.

Thus we cannot assume that it is safe to release Send-related
resources just because a ULP reply has arrived.

After some analysis, we have determined that the completion
housekeeping will not be difficult for xprtrdma:

 - Inline Send buffers are registered via the local DMA key, and
   are already left DMA mapped for the lifetime of a transport
   connection, thus no additional handling is necessary for those
 - Gathered Sends involving page cache pages _will_ need to
   DMA unmap those pages after the Send completes. But like
   inline send buffers, they are registered via the local DMA key,
   and thus will not need to be invalidated

In addition, RPC completion will need to wait for Send completion
in the latter case. However, nearly always, the Send that conveys
the RPC Call will have completed long before the RPC Reply
arrives, and thus no additional latency will be accrued.

Design notes:

In this patch, the rpcrdma_sendctx object is introduced, and a
lock-free circular queue is added to manage a set of them per
transport.

The RPC client's send path already prevents sending more than one
RPC Call at the same time. This allows us to treat the consumer
side of the queue (rpcrdma_sendctx_get_locked) as if there is a
single consumer thread.

The producer side of the queue (rpcrdma_sendctx_put_locked) is
invoked only from the Send completion handler, which is a single
thread of execution (soft IRQ).

The only care that needs to be taken is with the tail index, which
is shared between the producer and consumer. Only the producer
updates the tail index. The consumer compares the head with the
tail to ensure that the a sendctx that is in use is never handed
out again (or, expressed more conventionally, the queue is empty).

When the sendctx queue empties completely, there are enough Sends
outstanding that posting more Send operations can result in a Send
Queue overflow. In this case, the ULP is told to wait and try again.
This introduces strong Send Queue accounting to xprtrdma.

As a final touch, Jason Gunthorpe <jgunthorpe@obsidianresearch.com>
suggested a mechanism that does not require signaling every Send.
We signal once every N Sends, and perform SGE unmapping of N Send
operations during that one completion.

Reported-by: Sagi Grimberg <sagi@grimberg.me>
Suggested-by: Jason Gunthorpe <jgunthorpe@obsidianresearch.com>
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Acked-by: Thomas Bogendoerfer <tbogendoerfer@suse.de>
---
 net/sunrpc/xprtrdma/rpc_rdma.c  |   40 ++++----
 net/sunrpc/xprtrdma/transport.c |    6 -
 net/sunrpc/xprtrdma/verbs.c     |  195 ++++++++++++++++++++++++++++++++++++++--
 net/sunrpc/xprtrdma/xprt_rdma.h |   38 ++++++-
 4 files changed, 247 insertions(+), 32 deletions(-)

--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -512,23 +512,26 @@ rpcrdma_encode_reply_chunk(struct rpcrdm
 }
 
 /**
- * rpcrdma_unmap_sges - DMA-unmap Send buffers
- * @ia: interface adapter (device)
- * @req: req with possibly some SGEs to be DMA unmapped
+ * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
+ * @sc: sendctx containing SGEs to unmap
  *
  */
 void
-rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
 {
+	struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
 	struct ib_sge *sge;
 	unsigned int count;
 
+	dprintk("RPC:       %s: unmapping %u sges for sc=%p\n",
+		__func__, sc->sc_unmap_count, sc);
+
 	/* The first two SGEs contain the transport header and
 	 * the inline buffer. These are always left mapped so
 	 * they can be cheaply re-used.
 	 */
-	sge = &req->rl_send_sge[2];
-	for (count = req->rl_mapped_sges; count--; sge++)
+	sge = &sc->sc_sges[2];
+	for (count = sc->sc_unmap_count; count; ++sge, --count)
 		ib_dma_unmap_page(ia->ri_device,
 				  sge->addr, sge->length, DMA_TO_DEVICE);
 }
@@ -539,8 +542,9 @@ static bool
 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
 			u32 len)
 {
+	struct rpcrdma_sendctx *sc = req->rl_sendctx;
 	struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
-	struct ib_sge *sge = &req->rl_send_sge[0];
+	struct ib_sge *sge = sc->sc_sges;
 
 	if (!rpcrdma_dma_map_regbuf(ia, rb))
 		goto out_regbuf;
@@ -550,7 +554,7 @@ rpcrdma_prepare_hdr_sge(struct rpcrdma_i
 
 	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
 				      sge->length, DMA_TO_DEVICE);
-	req->rl_send_wr.num_sge++;
+	sc->sc_wr.num_sge++;
 	return true;
 
 out_regbuf:
@@ -565,10 +569,11 @@ static bool
 rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
 			 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
 {
+	struct rpcrdma_sendctx *sc = req->rl_sendctx;
 	unsigned int sge_no, page_base, len, remaining;
 	struct rpcrdma_regbuf *rb = req->rl_sendbuf;
 	struct ib_device *device = ia->ri_device;
-	struct ib_sge *sge = req->rl_send_sge;
+	struct ib_sge *sge = sc->sc_sges;
 	u32 lkey = ia->ri_pd->local_dma_lkey;
 	struct page *page, **ppages;
 
@@ -631,7 +636,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_
 			sge[sge_no].length = len;
 			sge[sge_no].lkey = lkey;
 
-			req->rl_mapped_sges++;
+			sc->sc_unmap_count++;
 			ppages++;
 			remaining -= len;
 			page_base = 0;
@@ -657,11 +662,11 @@ map_tail:
 			goto out_mapping_err;
 		sge[sge_no].length = len;
 		sge[sge_no].lkey = lkey;
-		req->rl_mapped_sges++;
+		sc->sc_unmap_count++;
 	}
 
 out:
-	req->rl_send_wr.num_sge += sge_no;
+	sc->sc_wr.num_sge += sge_no;
 	return true;
 
 out_regbuf:
@@ -669,12 +674,12 @@ out_regbuf:
 	return false;
 
 out_mapping_overflow:
-	rpcrdma_unmap_sges(ia, req);
+	rpcrdma_unmap_sendctx(sc);
 	pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
 	return false;
 
 out_mapping_err:
-	rpcrdma_unmap_sges(ia, req);
+	rpcrdma_unmap_sendctx(sc);
 	pr_err("rpcrdma: Send mapping error\n");
 	return false;
 }
@@ -694,8 +699,11 @@ rpcrdma_prepare_send_sges(struct rpcrdma
 			  struct rpcrdma_req *req, u32 hdrlen,
 			  struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
 {
-	req->rl_send_wr.num_sge = 0;
-	req->rl_mapped_sges = 0;
+	req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
+	if (!req->rl_sendctx)
+		return -ENOBUFS;
+	req->rl_sendctx->sc_wr.num_sge = 0;
+	req->rl_sendctx->sc_unmap_count = 0;
 
 	if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
 		return -EIO;
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -687,7 +687,6 @@ xprt_rdma_free(struct rpc_task *task)
 
 	if (!list_empty(&req->rl_registered))
 		ia->ri_ops->ro_unmap_sync(r_xprt, &req->rl_registered);
-	rpcrdma_unmap_sges(ia, req);
 	rpcrdma_buffer_put(req);
 }
 
@@ -790,11 +789,12 @@ void xprt_rdma_print_stats(struct rpc_xp
 		   r_xprt->rx_stats.failed_marshal_count,
 		   r_xprt->rx_stats.bad_reply_count,
 		   r_xprt->rx_stats.nomsg_call_count);
-	seq_printf(seq, "%lu %lu %lu %lu\n",
+	seq_printf(seq, "%lu %lu %lu %lu %lu\n",
 		   r_xprt->rx_stats.mrs_recovered,
 		   r_xprt->rx_stats.mrs_orphaned,
 		   r_xprt->rx_stats.mrs_allocated,
-		   r_xprt->rx_stats.local_inv_needed);
+		   r_xprt->rx_stats.local_inv_needed,
+		   r_xprt->rx_stats.empty_sendctx_q);
 }
 
 static int
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -52,6 +52,8 @@
 #include <linux/prefetch.h>
 #include <linux/sunrpc/addr.h>
 #include <linux/sunrpc/svc_rdma.h>
+
+#include <asm-generic/barrier.h>
 #include <asm/bitops.h>
 
 #include <rdma/ib_cm.h>
@@ -126,11 +128,17 @@ rpcrdma_qp_async_error_upcall(struct ib_
 static void
 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 {
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct rpcrdma_sendctx *sc =
+		container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
+
 	/* WARNING: Only wr_cqe and status are reliable at this point */
 	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
 		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
 		       ib_wc_status_msg(wc->status),
 		       wc->status, wc->vendor_err);
+
+	rpcrdma_sendctx_put_locked(sc);
 }
 
 /**
@@ -542,6 +550,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep,
 		ep->rep_attr.cap.max_recv_sge);
 
 	/* set trigger for requesting send completion */
+	ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
+				   cdata->max_requests >> 2);
+	ep->rep_send_count = ep->rep_send_batch;
 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
 	if (ep->rep_cqinit <= 2)
 		ep->rep_cqinit = 0;	/* always signal? */
@@ -824,6 +835,168 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep
 	ib_drain_qp(ia->ri_id->qp);
 }
 
+/* Fixed-size circular FIFO queue. This implementation is wait-free and
+ * lock-free.
+ *
+ * Consumer is the code path that posts Sends. This path dequeues a
+ * sendctx for use by a Send operation. Multiple consumer threads
+ * are serialized by the RPC transport lock, which allows only one
+ * ->send_request call at a time.
+ *
+ * Producer is the code path that handles Send completions. This path
+ * enqueues a sendctx that has been completed. Multiple producer
+ * threads are serialized by the ib_poll_cq() function.
+ */
+
+/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
+ * queue activity, and ib_drain_qp has flushed all remaining Send
+ * requests.
+ */
+static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
+{
+	unsigned long i;
+
+	for (i = 0; i <= buf->rb_sc_last; i++)
+		kfree(buf->rb_sc_ctxs[i]);
+	kfree(buf->rb_sc_ctxs);
+}
+
+static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
+{
+	struct rpcrdma_sendctx *sc;
+
+	sc = kzalloc(sizeof(*sc) +
+		     ia->ri_max_send_sges * sizeof(struct ib_sge),
+		     GFP_KERNEL);
+	if (!sc)
+		return NULL;
+
+	sc->sc_wr.wr_cqe = &sc->sc_cqe;
+	sc->sc_wr.sg_list = sc->sc_sges;
+	sc->sc_wr.opcode = IB_WR_SEND;
+	sc->sc_cqe.done = rpcrdma_wc_send;
+	return sc;
+}
+
+static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
+{
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_sendctx *sc;
+	unsigned long i;
+
+	/* Maximum number of concurrent outstanding Send WRs. Capping
+	 * the circular queue size stops Send Queue overflow by causing
+	 * the ->send_request call to fail temporarily before too many
+	 * Sends are posted.
+	 */
+	i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
+	dprintk("RPC:       %s: allocating %lu send_ctxs\n", __func__, i);
+	buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
+	if (!buf->rb_sc_ctxs)
+		return -ENOMEM;
+
+	buf->rb_sc_last = i - 1;
+	for (i = 0; i <= buf->rb_sc_last; i++) {
+		sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
+		if (!sc)
+			goto out_destroy;
+
+		sc->sc_xprt = r_xprt;
+		buf->rb_sc_ctxs[i] = sc;
+	}
+
+	return 0;
+
+out_destroy:
+	rpcrdma_sendctxs_destroy(buf);
+	return -ENOMEM;
+}
+
+/* The sendctx queue is not guaranteed to have a size that is a
+ * power of two, thus the helpers in circ_buf.h cannot be used.
+ * The other option is to use modulus (%), which can be expensive.
+ */
+static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
+					  unsigned long item)
+{
+	return likely(item < buf->rb_sc_last) ? item + 1 : 0;
+}
+
+/**
+ * rpcrdma_sendctx_get_locked - Acquire a send context
+ * @buf: transport buffers from which to acquire an unused context
+ *
+ * Returns pointer to a free send completion context; or NULL if
+ * the queue is empty.
+ *
+ * Usage: Called to acquire an SGE array before preparing a Send WR.
+ *
+ * The caller serializes calls to this function (per rpcrdma_buffer),
+ * and provides an effective memory barrier that flushes the new value
+ * of rb_sc_head.
+ */
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
+{
+	struct rpcrdma_xprt *r_xprt;
+	struct rpcrdma_sendctx *sc;
+	unsigned long next_head;
+
+	next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
+
+	if (next_head == READ_ONCE(buf->rb_sc_tail))
+		goto out_emptyq;
+
+	/* ORDER: item must be accessed _before_ head is updated */
+	sc = buf->rb_sc_ctxs[next_head];
+
+	/* Releasing the lock in the caller acts as a memory
+	 * barrier that flushes rb_sc_head.
+	 */
+	buf->rb_sc_head = next_head;
+
+	return sc;
+
+out_emptyq:
+	/* The queue is "empty" if there have not been enough Send
+	 * completions recently. This is a sign the Send Queue is
+	 * backing up. Cause the caller to pause and try again.
+	 */
+	dprintk("RPC:       %s: empty sendctx queue\n", __func__);
+	r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
+	r_xprt->rx_stats.empty_sendctx_q++;
+	return NULL;
+}
+
+/**
+ * rpcrdma_sendctx_put_locked - Release a send context
+ * @sc: send context to release
+ *
+ * Usage: Called from Send completion to return a sendctxt
+ * to the queue.
+ *
+ * The caller serializes calls to this function (per rpcrdma_buffer).
+ */
+void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
+{
+	struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
+	unsigned long next_tail;
+
+	/* Unmap SGEs of previously completed by unsignaled
+	 * Sends by walking up the queue until @sc is found.
+	 */
+	next_tail = buf->rb_sc_tail;
+	do {
+		next_tail = rpcrdma_sendctx_next(buf, next_tail);
+
+		/* ORDER: item must be accessed _before_ tail is updated */
+		rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
+
+	} while (buf->rb_sc_ctxs[next_tail] != sc);
+
+	/* Paired with READ_ONCE */
+	smp_store_release(&buf->rb_sc_tail, next_tail);
+}
+
 static void
 rpcrdma_mr_recovery_worker(struct work_struct *work)
 {
@@ -919,13 +1092,8 @@ rpcrdma_create_req(struct rpcrdma_xprt *
 	spin_lock(&buffer->rb_reqslock);
 	list_add(&req->rl_all, &buffer->rb_allreqs);
 	spin_unlock(&buffer->rb_reqslock);
-	req->rl_cqe.done = rpcrdma_wc_send;
 	req->rl_buffer = &r_xprt->rx_buf;
 	INIT_LIST_HEAD(&req->rl_registered);
-	req->rl_send_wr.next = NULL;
-	req->rl_send_wr.wr_cqe = &req->rl_cqe;
-	req->rl_send_wr.sg_list = req->rl_send_sge;
-	req->rl_send_wr.opcode = IB_WR_SEND;
 	return req;
 }
 
@@ -1017,6 +1185,10 @@ rpcrdma_buffer_create(struct rpcrdma_xpr
 		list_add(&rep->rr_list, &buf->rb_recv_bufs);
 	}
 
+	rc = rpcrdma_sendctxs_create(r_xprt);
+	if (rc)
+		goto out;
+
 	return 0;
 out:
 	rpcrdma_buffer_destroy(buf);
@@ -1093,6 +1265,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_bu
 	cancel_delayed_work_sync(&buf->rb_recovery_worker);
 	cancel_delayed_work_sync(&buf->rb_refresh_worker);
 
+	rpcrdma_sendctxs_destroy(buf);
+
 	while (!list_empty(&buf->rb_recv_bufs)) {
 		struct rpcrdma_rep *rep;
 
@@ -1208,7 +1382,6 @@ rpcrdma_buffer_put(struct rpcrdma_req *r
 	struct rpcrdma_buffer *buffers = req->rl_buffer;
 	struct rpcrdma_rep *rep = req->rl_reply;
 
-	req->rl_send_wr.num_sge = 0;
 	req->rl_reply = NULL;
 
 	spin_lock(&buffers->rb_lock);
@@ -1340,7 +1513,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
 		struct rpcrdma_ep *ep,
 		struct rpcrdma_req *req)
 {
-	struct ib_send_wr *send_wr = &req->rl_send_wr;
+	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
 	struct ib_send_wr *send_wr_fail;
 	int rc;
 
@@ -1354,7 +1527,13 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
 	dprintk("RPC:       %s: posting %d s/g entries\n",
 		__func__, send_wr->num_sge);
 
-	rpcrdma_set_signaled(ep, send_wr);
+	if (!ep->rep_send_count) {
+		send_wr->send_flags |= IB_SEND_SIGNALED;
+		ep->rep_send_count = ep->rep_send_batch;
+	} else {
+		send_wr->send_flags &= ~IB_SEND_SIGNALED;
+		--ep->rep_send_count;
+	}
 	rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
 	if (rc)
 		goto out_postsend_err;
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -93,6 +93,8 @@ enum {
  */
 
 struct rpcrdma_ep {
+	unsigned int		rep_send_count;
+	unsigned int		rep_send_batch;
 	atomic_t		rep_cqcount;
 	int			rep_cqinit;
 	int			rep_connected;
@@ -232,6 +234,27 @@ struct rpcrdma_rep {
 	struct ib_recv_wr	rr_recv_wr;
 };
 
+/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
+ */
+struct rpcrdma_xprt;
+struct rpcrdma_sendctx {
+	struct ib_send_wr	sc_wr;
+	struct ib_cqe		sc_cqe;
+	struct rpcrdma_xprt	*sc_xprt;
+	unsigned int		sc_unmap_count;
+	struct ib_sge		sc_sges[];
+};
+
+/* Limit the number of SGEs that can be unmapped during one
+ * Send completion. This caps the amount of work a single
+ * completion can do before returning to the provider.
+ *
+ * Setting this to zero disables Send completion batching.
+ */
+enum {
+	RPCRDMA_MAX_SEND_BATCH = 7,
+};
+
 /*
  * struct rpcrdma_mw - external memory region metadata
  *
@@ -343,19 +366,16 @@ enum {
 struct rpcrdma_buffer;
 struct rpcrdma_req {
 	struct list_head	rl_list;
-	unsigned int		rl_mapped_sges;
 	unsigned int		rl_connect_cookie;
 	struct rpcrdma_buffer	*rl_buffer;
 	struct rpcrdma_rep	*rl_reply;
 	struct xdr_stream	rl_stream;
 	struct xdr_buf		rl_hdrbuf;
-	struct ib_send_wr	rl_send_wr;
-	struct ib_sge		rl_send_sge[RPCRDMA_MAX_SEND_SGES];
+	struct rpcrdma_sendctx	*rl_sendctx;
 	struct rpcrdma_regbuf	*rl_rdmabuf;	/* xprt header */
 	struct rpcrdma_regbuf	*rl_sendbuf;	/* rq_snd_buf */
 	struct rpcrdma_regbuf	*rl_recvbuf;	/* rq_rcv_buf */
 
-	struct ib_cqe		rl_cqe;
 	struct list_head	rl_all;
 	bool			rl_backchannel;
 
@@ -402,6 +422,11 @@ struct rpcrdma_buffer {
 	struct list_head	rb_mws;
 	struct list_head	rb_all;
 
+	unsigned long		rb_sc_head;
+	unsigned long		rb_sc_tail;
+	unsigned long		rb_sc_last;
+	struct rpcrdma_sendctx	**rb_sc_ctxs;
+
 	spinlock_t		rb_lock;	/* protect buf lists */
 	int			rb_send_count, rb_recv_count;
 	struct list_head	rb_send_bufs;
@@ -456,6 +481,7 @@ struct rpcrdma_stats {
 	unsigned long		mrs_recovered;
 	unsigned long		mrs_orphaned;
 	unsigned long		mrs_allocated;
+	unsigned long		empty_sendctx_q;
 
 	/* accessed when receiving a reply */
 	unsigned long long	total_rdma_reply;
@@ -557,6 +583,8 @@ struct rpcrdma_rep *rpcrdma_create_rep(s
 void rpcrdma_destroy_req(struct rpcrdma_req *);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf);
+void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
 
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
@@ -617,7 +645,7 @@ int rpcrdma_prepare_send_sges(struct rpc
 			      struct rpcrdma_req *req, u32 hdrlen,
 			      struct xdr_buf *xdr,
 			      enum rpcrdma_chunktype rtype);
-void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
+void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc);
 int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);