Blob Blame History Raw
From: Chuck Lever <chuck.lever@oracle.com>
Date: Mon, 7 May 2018 15:28:04 -0400
Subject: svcrdma: Introduce svc_rdma_send_ctxt
Patch-mainline: v4.18-rc1
Git-commit: 4201c7464753827803366b40e82eb050c04ebdef
References: bsc#1103992 FATE#326009

svc_rdma_op_ctxt's are pre-allocated and maintained on a per-xprt
free list. This eliminates the overhead of calling kmalloc / kfree,
both of which grab a globally shared lock that disables interrupts.
Introduce a replacement to svc_rdma_op_ctxt's that is built
especially for the svcrdma Send path.

Subsequent patches will take advantage of this new structure by
allocating real resources which are then cached in these objects.
The allocations are freed when the transport is torn down.

I've renamed the structure so that static type checking can be used
to ensure that uses of op_ctxt and send_ctxt are not confused. As an
additional clean up, structure fields are renamed to conform with
kernel coding conventions.

Additional clean ups:
- Handle svc_rdma_send_ctxt_get allocation failure at each call
  site, rather than pre-allocating and hoping we guessed correctly
- All send_ctxt_put call-sites request page freeing, so remove
  the @free_pages argument
- All send_ctxt_put call-sites unmap SGEs, so fold that into
  svc_rdma_send_ctxt_put

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
Acked-by: Thomas Bogendoerfer <tbogendoerfer@suse.de>
---
 include/linux/sunrpc/svc_rdma.h            |   35 ++--
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |   13 -
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c    |   13 -
 net/sunrpc/xprtrdma/svc_rdma_sendto.c      |  252 ++++++++++++++++++++++++-----
 net/sunrpc/xprtrdma/svc_rdma_transport.c   |  205 -----------------------
 5 files changed, 253 insertions(+), 265 deletions(-)

--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -109,8 +109,8 @@ struct svcxprt_rdma {
 
 	struct ib_pd         *sc_pd;
 
-	spinlock_t	     sc_ctxt_lock;
-	struct list_head     sc_ctxts;
+	spinlock_t	     sc_send_lock;
+	struct list_head     sc_send_ctxts;
 	int		     sc_ctxt_used;
 	spinlock_t	     sc_rw_ctxt_lock;
 	struct list_head     sc_rw_ctxts;
@@ -158,6 +158,19 @@ struct svc_rdma_recv_ctxt {
 	struct page		*rc_pages[RPCSVC_MAXPAGES];
 };
 
+enum {
+	RPCRDMA_MAX_SGES	= 1 + (RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE),
+};
+
+struct svc_rdma_send_ctxt {
+	struct list_head	sc_list;
+	struct ib_send_wr	sc_send_wr;
+	struct ib_cqe		sc_cqe;
+	int			sc_page_count;
+	struct page		*sc_pages[RPCSVC_MAXPAGES];
+	struct ib_sge		sc_sges[RPCRDMA_MAX_SGES];
+};
+
 /* svc_rdma_backchannel.c */
 extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
 				    __be32 *rdma_resp,
@@ -183,24 +196,22 @@ extern int svc_rdma_send_reply_chunk(str
 				     struct xdr_buf *xdr);
 
 /* svc_rdma_sendto.c */
+extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma);
+extern struct svc_rdma_send_ctxt *
+		svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma);
+extern void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
+				   struct svc_rdma_send_ctxt *ctxt);
+extern int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr);
 extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
-				  struct svc_rdma_op_ctxt *ctxt,
+				  struct svc_rdma_send_ctxt *ctxt,
 				  __be32 *rdma_resp, unsigned int len);
 extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
-				 struct svc_rdma_op_ctxt *ctxt,
+				 struct svc_rdma_send_ctxt *ctxt,
 				 u32 inv_rkey);
 extern int svc_rdma_sendto(struct svc_rqst *);
 
 /* svc_rdma_transport.c */
-extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
-extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
 extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
-extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
-extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
-extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
 extern void svc_sq_reap(struct svcxprt_rdma *);
 extern void svc_rq_reap(struct svcxprt_rdma *);
 extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015 Oracle.  All rights reserved.
+ * Copyright (c) 2015-2018 Oracle.  All rights reserved.
  *
  * Support for backward direction RPCs on RPC/RDMA (server-side).
  */
@@ -116,10 +116,14 @@ out_notfound:
 static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 			      struct rpc_rqst *rqst)
 {
-	struct svc_rdma_op_ctxt *ctxt;
+	struct svc_rdma_send_ctxt *ctxt;
 	int ret;
 
-	ctxt = svc_rdma_get_context(rdma);
+	ctxt = svc_rdma_send_ctxt_get(rdma);
+	if (!ctxt) {
+		ret = -ENOMEM;
+		goto out_err;
+	}
 
 	/* rpcrdma_bc_send_request builds the transport header and
 	 * the backchannel RPC message in the same buffer. Thus only
@@ -143,8 +147,7 @@ out_err:
 	return ret;
 
 out_unmap:
-	svc_rdma_unmap_dma(ctxt);
-	svc_rdma_put_context(ctxt, 1);
+	svc_rdma_send_ctxt_put(rdma, ctxt);
 	ret = -EIO;
 	goto out_err;
 }
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -601,7 +601,7 @@ static void rdma_read_complete(struct sv
 static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
 				__be32 *rdma_argp, int status)
 {
-	struct svc_rdma_op_ctxt *ctxt;
+	struct svc_rdma_send_ctxt *ctxt;
 	__be32 *p, *err_msgp;
 	unsigned int length;
 	struct page *page;
@@ -631,7 +631,10 @@ static void svc_rdma_send_error(struct s
 	length = (unsigned long)p - (unsigned long)err_msgp;
 
 	/* Map transport header; no RPC message payload */
-	ctxt = svc_rdma_get_context(xprt);
+	ctxt = svc_rdma_send_ctxt_get(xprt);
+	if (!ctxt)
+		return;
+
 	ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
 	if (ret) {
 		dprintk("svcrdma: Error %d mapping send for protocol error\n",
@@ -640,10 +643,8 @@ static void svc_rdma_send_error(struct s
 	}
 
 	ret = svc_rdma_post_send_wr(xprt, ctxt, 0);
-	if (ret) {
-		svc_rdma_unmap_dma(ctxt);
-		svc_rdma_put_context(ctxt, 1);
-	}
+	if (ret)
+		svc_rdma_send_ctxt_put(xprt, ctxt);
 }
 
 /* By convention, backchannel calls arrive via rdma_msg type
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -75,11 +75,11 @@
  * DMA-unmap the pages under I/O for that Write segment. The Write
  * completion handler does not release any pages.
  *
- * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt.
+ * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt.
  * The ownership of all of the Reply's pages are transferred into that
  * ctxt, the Send WR is posted, and sendto returns.
  *
- * The svc_rdma_op_ctxt is presented when the Send WR completes. The
+ * The svc_rdma_send_ctxt is presented when the Send WR completes. The
  * Send completion handler finally releases the Reply's pages.
  *
  * This mechanism also assumes that completions on the transport's Send
@@ -114,6 +114,184 @@
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
+static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
+
+static inline struct svc_rdma_send_ctxt *
+svc_rdma_next_send_ctxt(struct list_head *list)
+{
+	return list_first_entry_or_null(list, struct svc_rdma_send_ctxt,
+					sc_list);
+}
+
+static struct svc_rdma_send_ctxt *
+svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_send_ctxt *ctxt;
+	int i;
+
+	ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+	if (!ctxt)
+		return NULL;
+
+	ctxt->sc_cqe.done = svc_rdma_wc_send;
+	ctxt->sc_send_wr.next = NULL;
+	ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
+	ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
+	ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
+	for (i = 0; i < ARRAY_SIZE(ctxt->sc_sges); i++)
+		ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
+	return ctxt;
+}
+
+/**
+ * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
+ * @rdma: svcxprt_rdma being torn down
+ *
+ */
+void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_send_ctxt *ctxt;
+
+	while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) {
+		list_del(&ctxt->sc_list);
+		kfree(ctxt);
+	}
+}
+
+/**
+ * svc_rdma_send_ctxt_get - Get a free send_ctxt
+ * @rdma: controlling svcxprt_rdma
+ *
+ * Returns a ready-to-use send_ctxt, or NULL if none are
+ * available and a fresh one cannot be allocated.
+ */
+struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_send_ctxt *ctxt;
+
+	spin_lock(&rdma->sc_send_lock);
+	ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts);
+	if (!ctxt)
+		goto out_empty;
+	list_del(&ctxt->sc_list);
+	spin_unlock(&rdma->sc_send_lock);
+
+out:
+	ctxt->sc_send_wr.num_sge = 0;
+	ctxt->sc_page_count = 0;
+	return ctxt;
+
+out_empty:
+	spin_unlock(&rdma->sc_send_lock);
+	ctxt = svc_rdma_send_ctxt_alloc(rdma);
+	if (!ctxt)
+		return NULL;
+	goto out;
+}
+
+/**
+ * svc_rdma_send_ctxt_put - Return send_ctxt to free list
+ * @rdma: controlling svcxprt_rdma
+ * @ctxt: object to return to the free list
+ *
+ * Pages left in sc_pages are DMA unmapped and released.
+ */
+void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
+			    struct svc_rdma_send_ctxt *ctxt)
+{
+	struct ib_device *device = rdma->sc_cm_id->device;
+	unsigned int i;
+
+	for (i = 0; i < ctxt->sc_send_wr.num_sge; i++)
+		ib_dma_unmap_page(device,
+				  ctxt->sc_sges[i].addr,
+				  ctxt->sc_sges[i].length,
+				  DMA_TO_DEVICE);
+
+	for (i = 0; i < ctxt->sc_page_count; ++i)
+		put_page(ctxt->sc_pages[i]);
+
+	spin_lock(&rdma->sc_send_lock);
+	list_add(&ctxt->sc_list, &rdma->sc_send_ctxts);
+	spin_unlock(&rdma->sc_send_lock);
+}
+
+/**
+ * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq: Completion Queue context
+ * @wc: Work Completion object
+ *
+ * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
+ * the Send completion handler could be running.
+ */
+static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct svcxprt_rdma *rdma = cq->cq_context;
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_send_ctxt *ctxt;
+
+	trace_svcrdma_wc_send(wc);
+
+	atomic_inc(&rdma->sc_sq_avail);
+	wake_up(&rdma->sc_send_wait);
+
+	ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
+	svc_rdma_send_ctxt_put(rdma, ctxt);
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+		svc_xprt_enqueue(&rdma->sc_xprt);
+		if (wc->status != IB_WC_WR_FLUSH_ERR)
+			pr_err("svcrdma: Send: %s (%u/0x%x)\n",
+			       ib_wc_status_msg(wc->status),
+			       wc->status, wc->vendor_err);
+	}
+
+	svc_xprt_put(&rdma->sc_xprt);
+}
+
+int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
+{
+	struct ib_send_wr *bad_wr, *n_wr;
+	int wr_count;
+	int i;
+	int ret;
+
+	wr_count = 1;
+	for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
+		wr_count++;
+
+	/* If the SQ is full, wait until an SQ entry is available */
+	while (1) {
+		if ((atomic_sub_return(wr_count, &rdma->sc_sq_avail) < 0)) {
+			atomic_inc(&rdma_stat_sq_starve);
+			trace_svcrdma_sq_full(rdma);
+			atomic_add(wr_count, &rdma->sc_sq_avail);
+			wait_event(rdma->sc_send_wait,
+				   atomic_read(&rdma->sc_sq_avail) > wr_count);
+			if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
+				return -ENOTCONN;
+			trace_svcrdma_sq_retry(rdma);
+			continue;
+		}
+		/* Take a transport ref for each WR posted */
+		for (i = 0; i < wr_count; i++)
+			svc_xprt_get(&rdma->sc_xprt);
+
+		/* Bump used SQ WR count and post */
+		ret = ib_post_send(rdma->sc_qp, wr, &bad_wr);
+		trace_svcrdma_post_send(wr, ret);
+		if (ret) {
+			set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+			for (i = 0; i < wr_count; i++)
+				svc_xprt_put(&rdma->sc_xprt);
+			wake_up(&rdma->sc_send_wait);
+		}
+		break;
+	}
+	return ret;
+}
+
 static u32 xdr_padsize(u32 len)
 {
 	return (len & 3) ? (4 - (len & 3)) : 0;
@@ -303,7 +481,7 @@ static u32 svc_rdma_get_inv_rkey(__be32
 }
 
 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
-				 struct svc_rdma_op_ctxt *ctxt,
+				 struct svc_rdma_send_ctxt *ctxt,
 				 unsigned int sge_no,
 				 struct page *page,
 				 unsigned long offset,
@@ -316,10 +494,9 @@ static int svc_rdma_dma_map_page(struct
 	if (ib_dma_mapping_error(dev, dma_addr))
 		goto out_maperr;
 
-	ctxt->sge[sge_no].addr = dma_addr;
-	ctxt->sge[sge_no].length = len;
-	ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
-	ctxt->mapped_sges++;
+	ctxt->sc_sges[sge_no].addr = dma_addr;
+	ctxt->sc_sges[sge_no].length = len;
+	ctxt->sc_send_wr.num_sge++;
 	return 0;
 
 out_maperr:
@@ -331,7 +508,7 @@ out_maperr:
  * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
  */
 static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
-				struct svc_rdma_op_ctxt *ctxt,
+				struct svc_rdma_send_ctxt *ctxt,
 				unsigned int sge_no,
 				unsigned char *base,
 				unsigned int len)
@@ -352,14 +529,13 @@ static int svc_rdma_dma_map_buf(struct s
  *	%-EIO if DMA mapping failed.
  */
 int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
-			   struct svc_rdma_op_ctxt *ctxt,
+			   struct svc_rdma_send_ctxt *ctxt,
 			   __be32 *rdma_resp,
 			   unsigned int len)
 {
-	ctxt->direction = DMA_TO_DEVICE;
-	ctxt->pages[0] = virt_to_page(rdma_resp);
-	ctxt->count = 1;
-	return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
+	ctxt->sc_pages[0] = virt_to_page(rdma_resp);
+	ctxt->sc_page_count++;
+	return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->sc_pages[0], 0, len);
 }
 
 /* Load the xdr_buf into the ctxt's sge array, and DMA map each
@@ -368,7 +544,7 @@ int svc_rdma_map_reply_hdr(struct svcxpr
  * Returns zero on success, or a negative errno on failure.
  */
 static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
-				  struct svc_rdma_op_ctxt *ctxt,
+				  struct svc_rdma_send_ctxt *ctxt,
 				  struct xdr_buf *xdr, __be32 *wr_lst)
 {
 	unsigned int len, sge_no, remaining;
@@ -436,13 +612,13 @@ tail:
  * so they are released by the Send completion handler.
  */
 static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
-				   struct svc_rdma_op_ctxt *ctxt)
+				   struct svc_rdma_send_ctxt *ctxt)
 {
 	int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
 
-	ctxt->count += pages;
+	ctxt->sc_page_count += pages;
 	for (i = 0; i < pages; i++) {
-		ctxt->pages[i + 1] = rqstp->rq_respages[i];
+		ctxt->sc_pages[i + 1] = rqstp->rq_respages[i];
 		rqstp->rq_respages[i] = NULL;
 	}
 	rqstp->rq_next_page = rqstp->rq_respages + 1;
@@ -461,37 +637,29 @@ static void svc_rdma_save_io_pages(struc
  *	%-ENOMEM if ib_post_send failed.
  */
 int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
-			  struct svc_rdma_op_ctxt *ctxt,
+			  struct svc_rdma_send_ctxt *ctxt,
 			  u32 inv_rkey)
 {
-	struct ib_send_wr *send_wr = &ctxt->send_wr;
-
 	dprintk("svcrdma: posting Send WR with %u sge(s)\n",
-		ctxt->mapped_sges);
+		ctxt->sc_send_wr.num_sge);
 
-	send_wr->next = NULL;
-	ctxt->cqe.done = svc_rdma_wc_send;
-	send_wr->wr_cqe = &ctxt->cqe;
-	send_wr->sg_list = ctxt->sge;
-	send_wr->num_sge = ctxt->mapped_sges;
-	send_wr->send_flags = IB_SEND_SIGNALED;
 	if (inv_rkey) {
-		send_wr->opcode = IB_WR_SEND_WITH_INV;
-		send_wr->ex.invalidate_rkey = inv_rkey;
+		ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
+		ctxt->sc_send_wr.ex.invalidate_rkey = inv_rkey;
 	} else {
-		send_wr->opcode = IB_WR_SEND;
+		ctxt->sc_send_wr.opcode = IB_WR_SEND;
 	}
 
-	return svc_rdma_send(rdma, send_wr);
+	return svc_rdma_send(rdma, &ctxt->sc_send_wr);
 }
 
 /* Prepare the portion of the RPC Reply that will be transmitted
  * via RDMA Send. The RPC-over-RDMA transport header is prepared
- * in sge[0], and the RPC xdr_buf is prepared in following sges.
+ * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
  *
  * Depending on whether a Write list or Reply chunk is present,
  * the server may send all, a portion of, or none of the xdr_buf.
- * In the latter case, only the transport header (sge[0]) is
+ * In the latter case, only the transport header (sc_sges[0]) is
  * transmitted.
  *
  * RDMA Send is the last step of transmitting an RPC reply. Pages
@@ -508,11 +676,13 @@ static int svc_rdma_send_reply_msg(struc
 				   struct svc_rqst *rqstp,
 				   __be32 *wr_lst, __be32 *rp_ch)
 {
-	struct svc_rdma_op_ctxt *ctxt;
+	struct svc_rdma_send_ctxt *ctxt;
 	u32 inv_rkey;
 	int ret;
 
-	ctxt = svc_rdma_get_context(rdma);
+	ctxt = svc_rdma_send_ctxt_get(rdma);
+	if (!ctxt)
+		return -ENOMEM;
 
 	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
 				     svc_rdma_reply_hdr_len(rdma_resp));
@@ -538,8 +708,7 @@ static int svc_rdma_send_reply_msg(struc
 	return 0;
 
 err:
-	svc_rdma_unmap_dma(ctxt);
-	svc_rdma_put_context(ctxt, 1);
+	svc_rdma_send_ctxt_put(rdma, ctxt);
 	return ret;
 }
 
@@ -553,11 +722,13 @@ err:
 static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
 				   __be32 *rdma_resp, struct svc_rqst *rqstp)
 {
-	struct svc_rdma_op_ctxt *ctxt;
+	struct svc_rdma_send_ctxt *ctxt;
 	__be32 *p;
 	int ret;
 
-	ctxt = svc_rdma_get_context(rdma);
+	ctxt = svc_rdma_send_ctxt_get(rdma);
+	if (!ctxt)
+		return -ENOMEM;
 
 	/* Replace the original transport header with an
 	 * RDMA_ERROR response. XID etc are preserved.
@@ -580,8 +751,7 @@ static int svc_rdma_send_error_msg(struc
 	return 0;
 
 err:
-	svc_rdma_unmap_dma(ctxt);
-	svc_rdma_put_context(ctxt, 1);
+	svc_rdma_send_ctxt_put(rdma, ctxt);
 	return ret;
 }
 
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -1,5 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
 /*
+ * Copyright (c) 2015-2018 Oracle. All rights reserved.
  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
  *
@@ -157,114 +158,6 @@ static void svc_rdma_bc_free(struct svc_
 }
 #endif	/* CONFIG_SUNRPC_BACKCHANNEL */
 
-static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt,
-					   gfp_t flags)
-{
-	struct svc_rdma_op_ctxt *ctxt;
-
-	ctxt = kmalloc(sizeof(*ctxt), flags);
-	if (ctxt) {
-		ctxt->xprt = xprt;
-		INIT_LIST_HEAD(&ctxt->list);
-	}
-	return ctxt;
-}
-
-static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
-{
-	unsigned int i;
-
-	i = xprt->sc_sq_depth;
-	while (i--) {
-		struct svc_rdma_op_ctxt *ctxt;
-
-		ctxt = alloc_ctxt(xprt, GFP_KERNEL);
-		if (!ctxt) {
-			dprintk("svcrdma: No memory for RDMA ctxt\n");
-			return false;
-		}
-		list_add(&ctxt->list, &xprt->sc_ctxts);
-	}
-	return true;
-}
-
-struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
-{
-	struct svc_rdma_op_ctxt *ctxt = NULL;
-
-	spin_lock(&xprt->sc_ctxt_lock);
-	xprt->sc_ctxt_used++;
-	if (list_empty(&xprt->sc_ctxts))
-		goto out_empty;
-
-	ctxt = list_first_entry(&xprt->sc_ctxts,
-				struct svc_rdma_op_ctxt, list);
-	list_del(&ctxt->list);
-	spin_unlock(&xprt->sc_ctxt_lock);
-
-out:
-	ctxt->count = 0;
-	ctxt->mapped_sges = 0;
-	return ctxt;
-
-out_empty:
-	/* Either pre-allocation missed the mark, or send
-	 * queue accounting is broken.
-	 */
-	spin_unlock(&xprt->sc_ctxt_lock);
-
-	ctxt = alloc_ctxt(xprt, GFP_NOIO);
-	if (ctxt)
-		goto out;
-
-	spin_lock(&xprt->sc_ctxt_lock);
-	xprt->sc_ctxt_used--;
-	spin_unlock(&xprt->sc_ctxt_lock);
-	WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n");
-	return NULL;
-}
-
-void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
-{
-	struct svcxprt_rdma *xprt = ctxt->xprt;
-	struct ib_device *device = xprt->sc_cm_id->device;
-	unsigned int i;
-
-	for (i = 0; i < ctxt->mapped_sges; i++)
-		ib_dma_unmap_page(device,
-				  ctxt->sge[i].addr,
-				  ctxt->sge[i].length,
-				  ctxt->direction);
-	ctxt->mapped_sges = 0;
-}
-
-void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
-{
-	struct svcxprt_rdma *xprt = ctxt->xprt;
-	int i;
-
-	if (free_pages)
-		for (i = 0; i < ctxt->count; i++)
-			put_page(ctxt->pages[i]);
-
-	spin_lock(&xprt->sc_ctxt_lock);
-	xprt->sc_ctxt_used--;
-	list_add(&ctxt->list, &xprt->sc_ctxts);
-	spin_unlock(&xprt->sc_ctxt_lock);
-}
-
-static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
-{
-	while (!list_empty(&xprt->sc_ctxts)) {
-		struct svc_rdma_op_ctxt *ctxt;
-
-		ctxt = list_first_entry(&xprt->sc_ctxts,
-					struct svc_rdma_op_ctxt, list);
-		list_del(&ctxt->list);
-		kfree(ctxt);
-	}
-}
-
 /* QP event handler */
 static void qp_event_handler(struct ib_event *event, void *context)
 {
@@ -292,39 +185,6 @@ static void qp_event_handler(struct ib_e
 	}
 }
 
-/**
- * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
- * @cq:        completion queue
- * @wc:        completed WR
- *
- */
-void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
-{
-	struct svcxprt_rdma *xprt = cq->cq_context;
-	struct ib_cqe *cqe = wc->wr_cqe;
-	struct svc_rdma_op_ctxt *ctxt;
-
-	trace_svcrdma_wc_send(wc);
-
-	atomic_inc(&xprt->sc_sq_avail);
-	wake_up(&xprt->sc_send_wait);
-
-	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
-	svc_rdma_unmap_dma(ctxt);
-	svc_rdma_put_context(ctxt, 1);
-
-	if (unlikely(wc->status != IB_WC_SUCCESS)) {
-		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-		svc_xprt_enqueue(&xprt->sc_xprt);
-		if (wc->status != IB_WC_WR_FLUSH_ERR)
-			pr_err("svcrdma: Send: %s (%u/0x%x)\n",
-			       ib_wc_status_msg(wc->status),
-			       wc->status, wc->vendor_err);
-	}
-
-	svc_xprt_put(&xprt->sc_xprt);
-}
-
 static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
 						 struct net *net)
 {
@@ -338,14 +198,14 @@ static struct svcxprt_rdma *svc_rdma_cre
 	INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
-	INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
+	INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts);
 	INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts);
 	INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
 	init_waitqueue_head(&cma_xprt->sc_send_wait);
 
 	spin_lock_init(&cma_xprt->sc_lock);
 	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
-	spin_lock_init(&cma_xprt->sc_ctxt_lock);
+	spin_lock_init(&cma_xprt->sc_send_lock);
 	spin_lock_init(&cma_xprt->sc_recv_lock);
 	spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
 
@@ -639,9 +499,6 @@ static struct svc_xprt *svc_rdma_accept(
 	}
 	atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
 
-	if (!svc_rdma_prealloc_ctxts(newxprt))
-		goto errout;
-
 	newxprt->sc_pd = ib_alloc_pd(dev, 0);
 	if (IS_ERR(newxprt->sc_pd)) {
 		dprintk("svcrdma: error creating PD for connect request\n");
@@ -793,11 +650,6 @@ static void __svc_rdma_free(struct work_
 
 	svc_rdma_flush_recv_queues(rdma);
 
-	/* Warn if we leaked a resource or under-referenced */
-	if (rdma->sc_ctxt_used != 0)
-		pr_err("svcrdma: ctxt still in use? (%d)\n",
-		       rdma->sc_ctxt_used);
-
 	/* Final put of backchannel client transport */
 	if (xprt->xpt_bc_xprt) {
 		xprt_put(xprt->xpt_bc_xprt);
@@ -805,7 +657,7 @@ static void __svc_rdma_free(struct work_
 	}
 
 	svc_rdma_destroy_rw_ctxts(rdma);
-	svc_rdma_destroy_ctxts(rdma);
+	svc_rdma_send_ctxts_destroy(rdma);
 	svc_rdma_recv_ctxts_destroy(rdma);
 
 	/* Destroy the QP if present (not a listener) */
@@ -859,52 +711,3 @@ static int svc_rdma_secure_port(struct s
 static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt)
 {
 }
-
-int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
-{
-	struct ib_send_wr *bad_wr, *n_wr;
-	int wr_count;
-	int i;
-	int ret;
-
-	if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
-		return -ENOTCONN;
-
-	wr_count = 1;
-	for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
-		wr_count++;
-
-	/* If the SQ is full, wait until an SQ entry is available */
-	while (1) {
-		if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) {
-			atomic_inc(&rdma_stat_sq_starve);
-			trace_svcrdma_sq_full(xprt);
-			atomic_add(wr_count, &xprt->sc_sq_avail);
-			wait_event(xprt->sc_send_wait,
-				   atomic_read(&xprt->sc_sq_avail) > wr_count);
-			if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
-				return -ENOTCONN;
-			trace_svcrdma_sq_retry(xprt);
-			continue;
-		}
-		/* Take a transport ref for each WR posted */
-		for (i = 0; i < wr_count; i++)
-			svc_xprt_get(&xprt->sc_xprt);
-
-		/* Bump used SQ WR count and post */
-		ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
-		trace_svcrdma_post_send(wr, ret);
-		if (ret) {
-			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-			for (i = 0; i < wr_count; i ++)
-				svc_xprt_put(&xprt->sc_xprt);
-			dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret);
-			dprintk("    sc_sq_avail=%d, sc_sq_depth=%d\n",
-				atomic_read(&xprt->sc_sq_avail),
-				xprt->sc_sq_depth);
-			wake_up(&xprt->sc_send_wait);
-		}
-		break;
-	}
-	return ret;
-}