Blob Blame History Raw
From: Chuck Lever <chuck.lever@oracle.com>
Date: Fri, 20 Oct 2017 10:48:36 -0400
Subject: xprtrdma: RPC completion should wait for Send completion
Patch-mainline: v4.15-rc1
Git-commit: 01bb35c89d90abe6fd1c0be001f84bbdfa7fa7d1
References: bsc#1103992 FATE#326009

When an RPC Call includes a file data payload, that payload can come
from pages in the page cache, or a user buffer (for direct I/O).

If the payload can fit inline, xprtrdma includes it in the Send
using a scatter-gather technique. xprtrdma mustn't allow the RPC
consumer to re-use the memory where that payload resides before the
Send completes. Otherwise, the new contents of that memory would be
exposed by an HCA retransmit of the Send operation.

So, block RPC completion on Send completion, but only in the case
where a separate file data payload is part of the Send. This
prevents the reuse of that memory while it is still part of a Send
operation without an undue cost to other cases.

Waiting is avoided in the common case because typically the Send
will have completed long before the RPC Reply arrives.

These days, an RPC timeout will trigger a disconnect, which tears
down the QP. The disconnect flushes all waiting Sends. This bounds
the amount of time the reply handler has to wait for a Send
completion.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Acked-by: Thomas Bogendoerfer <tbogendoerfer@suse.de>
---
 net/sunrpc/xprtrdma/rpc_rdma.c  |   26 +++++++++++++++++++++++++-
 net/sunrpc/xprtrdma/transport.c |    5 +++--
 net/sunrpc/xprtrdma/verbs.c     |    3 ++-
 net/sunrpc/xprtrdma/xprt_rdma.h |    4 ++++
 4 files changed, 34 insertions(+), 4 deletions(-)

--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -534,6 +534,11 @@ rpcrdma_unmap_sendctx(struct rpcrdma_sen
 	for (count = sc->sc_unmap_count; count; ++sge, --count)
 		ib_dma_unmap_page(ia->ri_device,
 				  sge->addr, sge->length, DMA_TO_DEVICE);
+
+	if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
+		smp_mb__after_atomic();
+		wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
+	}
 }
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
@@ -667,6 +672,8 @@ map_tail:
 
 out:
 	sc->sc_wr.num_sge += sge_no;
+	if (sc->sc_unmap_count)
+		__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
 	return true;
 
 out_regbuf:
@@ -704,6 +711,8 @@ rpcrdma_prepare_send_sges(struct rpcrdma
 		return -ENOBUFS;
 	req->rl_sendctx->sc_wr.num_sge = 0;
 	req->rl_sendctx->sc_unmap_count = 0;
+	req->rl_sendctx->sc_req = req;
+	__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
 
 	if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
 		return -EIO;
@@ -1305,6 +1314,20 @@ void rpcrdma_release_rqst(struct rpcrdma
 	if (!list_empty(&req->rl_registered))
 		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
 						    &req->rl_registered);
+
+	/* Ensure that any DMA mapped pages associated with
+	 * the Send of the RPC Call have been unmapped before
+	 * allowing the RPC to complete. This protects argument
+	 * memory not controlled by the RPC client from being
+	 * re-used before we're done with it.
+	 */
+	if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
+		r_xprt->rx_stats.reply_waits_for_send++;
+		out_of_line_wait_on_bit(&req->rl_flags,
+					RPCRDMA_REQ_F_TX_RESOURCES,
+					bit_wait,
+					TASK_UNINTERRUPTIBLE);
+	}
 }
 
 /* Reply handling runs in the poll worker thread. Anything that
@@ -1384,7 +1407,8 @@ void rpcrdma_reply_handler(struct rpcrdm
 	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
 		__func__, rep, req, be32_to_cpu(rep->rr_xid));
 
-	if (list_empty(&req->rl_registered))
+	if (list_empty(&req->rl_registered) &&
+	    !test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags))
 		rpcrdma_complete_rqst(rep);
 	else
 		queue_work(rpcrdma_receive_wq, &rep->rr_work);
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -789,12 +789,13 @@ void xprt_rdma_print_stats(struct rpc_xp
 		   r_xprt->rx_stats.failed_marshal_count,
 		   r_xprt->rx_stats.bad_reply_count,
 		   r_xprt->rx_stats.nomsg_call_count);
-	seq_printf(seq, "%lu %lu %lu %lu %lu\n",
+	seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
 		   r_xprt->rx_stats.mrs_recovered,
 		   r_xprt->rx_stats.mrs_orphaned,
 		   r_xprt->rx_stats.mrs_allocated,
 		   r_xprt->rx_stats.local_inv_needed,
-		   r_xprt->rx_stats.empty_sendctx_q);
+		   r_xprt->rx_stats.empty_sendctx_q,
+		   r_xprt->rx_stats.reply_waits_for_send);
 }
 
 static int
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1526,7 +1526,8 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
 	dprintk("RPC:       %s: posting %d s/g entries\n",
 		__func__, send_wr->num_sge);
 
-	if (!ep->rep_send_count) {
+	if (!ep->rep_send_count ||
+	    test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
 		send_wr->send_flags |= IB_SEND_SIGNALED;
 		ep->rep_send_count = ep->rep_send_batch;
 	} else {
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -236,11 +236,13 @@ struct rpcrdma_rep {
 
 /* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
  */
+struct rpcrdma_req;
 struct rpcrdma_xprt;
 struct rpcrdma_sendctx {
 	struct ib_send_wr	sc_wr;
 	struct ib_cqe		sc_cqe;
 	struct rpcrdma_xprt	*sc_xprt;
+	struct rpcrdma_req	*sc_req;
 	unsigned int		sc_unmap_count;
 	struct ib_sge		sc_sges[];
 };
@@ -387,6 +389,7 @@ struct rpcrdma_req {
 enum {
 	RPCRDMA_REQ_F_BACKCHANNEL = 0,
 	RPCRDMA_REQ_F_PENDING,
+	RPCRDMA_REQ_F_TX_RESOURCES,
 };
 
 static inline void
@@ -492,6 +495,7 @@ struct rpcrdma_stats {
 	/* accessed when receiving a reply */
 	unsigned long long	total_rdma_reply;
 	unsigned long long	fixup_copy_count;
+	unsigned long		reply_waits_for_send;
 	unsigned long		local_inv_needed;
 	unsigned long		nomsg_call_count;
 	unsigned long		bcall_count;