Blob Blame History Raw
From: Karsten Graul <kgraul@linux.ibm.com>
Subject: net/smc: atomic SMCD cursor handling
Patch-mainline: v4.20-rc4
Git-commit: b9a22dd9811dbcddb5623c499e5b736400059df6
References: FATE#325698, LTC#167867, bsc#1113481

Description:  net/smc: bugfix and compatibility patches
Symptom:      Random hangs in smc processing:
                user space application hangs in socket send() or recv() call or
                does never get a notification from a select() call.
              Missing compatibility to other platforms:
                confirm rkey and delete rkey processing is required by the
                design, but delete rkey processing is missing. This leads to
                protocol failures when communicating with other platforms like
                zOS. The SMC-D shutdown signal support is missing, so there is
                no detection if the remote peer closed the link group.
              Broken administration of available WR send payload buffers due to
              a use-after-free condition.
Problem:      Misbehaviour regarding the user space api can lead to hang
              situations. SMC is not fully compatible to some other platforms
              due to missing rkey processing and SMC-D shutdown signal support.
Solution:     Fixed protocoll deficiencies by implementing the required rkey
              processing. For SMC-D, the cursors are now handled atomically to
              handle parallel modifications. The SMC-D shutdown signal is now
              processed when received and sent to the remote peer if needed.
              Prereq patches are included.
Reproduction: Run SMC on a loaded system against zOS as peer system.

Upstream-Description:

              net/smc: atomic SMCD cursor handling

              Running uperf tests with SMCD on LPARs results in corrupted cursors.
              SMCD cursors should be treated atomically to fix cursor corruption.

              Signed-off-by: Ursula Braun <ubraun@linux.ibm.com>
              Signed-off-by: David S. Miller <davem@davemloft.net>


Signed-off-by: Karsten Graul <kgraul@linux.ibm.com>
Acked-by: Petr Tesarik <ptesarik@suse.com>
---
 net/smc/smc_cdc.c |   26 +++++++++++++----------
 net/smc/smc_cdc.h |   60 ++++++++++++++++++++++++++++++++++++++++--------------
 2 files changed, 60 insertions(+), 26 deletions(-)

--- a/net/smc/smc_cdc.c
+++ b/net/smc/smc_cdc.c
@@ -80,7 +80,7 @@ static inline void smc_cdc_add_pending_s
 		sizeof(struct smc_cdc_msg) > SMC_WR_BUF_SIZE,
 		"must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)");
 	BUILD_BUG_ON_MSG(
-		sizeof(struct smc_cdc_msg) != SMC_WR_TX_SIZE,
+		offsetofend(struct smc_cdc_msg, reserved) > SMC_WR_TX_SIZE,
 		"must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()");
 	BUILD_BUG_ON_MSG(
 		sizeof(struct smc_cdc_tx_pend) > SMC_WR_TX_PEND_PRIV_SIZE,
@@ -176,23 +176,24 @@ void smc_cdc_tx_dismiss_slots(struct smc
 int smcd_cdc_msg_send(struct smc_connection *conn)
 {
 	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
+	union smc_host_cursor curs;
 	struct smcd_cdc_msg cdc;
 	int rc, diff;
 
 	memset(&cdc, 0, sizeof(cdc));
 	cdc.common.type = SMC_CDC_MSG_TYPE;
-	cdc.prod_wrap = conn->local_tx_ctrl.prod.wrap;
-	cdc.prod_count = conn->local_tx_ctrl.prod.count;
-
-	cdc.cons_wrap = conn->local_tx_ctrl.cons.wrap;
-	cdc.cons_count = conn->local_tx_ctrl.cons.count;
-	cdc.prod_flags = conn->local_tx_ctrl.prod_flags;
-	cdc.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
+	curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.prod.acurs);
+	cdc.prod.wrap = curs.wrap;
+	cdc.prod.count = curs.count;
+	curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.cons.acurs);
+	cdc.cons.wrap = curs.wrap;
+	cdc.cons.count = curs.count;
+	cdc.cons.prod_flags = conn->local_tx_ctrl.prod_flags;
+	cdc.cons.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
 	rc = smcd_tx_ism_write(conn, &cdc, sizeof(cdc), 0, 1);
 	if (rc)
 		return rc;
-	smc_curs_copy(&conn->rx_curs_confirmed, &conn->local_tx_ctrl.cons,
-		      conn);
+	smc_curs_copy(&conn->rx_curs_confirmed, &curs, conn);
 	/* Calculate transmitted data and increment free send buffer space */
 	diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin,
 			     &conn->tx_curs_sent);
@@ -330,13 +331,16 @@ static void smc_cdc_msg_recv(struct smc_
 static void smcd_cdc_rx_tsklet(unsigned long data)
 {
 	struct smc_connection *conn = (struct smc_connection *)data;
+	struct smcd_cdc_msg *data_cdc;
 	struct smcd_cdc_msg cdc;
 	struct smc_sock *smc;
 
 	if (!conn)
 		return;
 
-	memcpy(&cdc, conn->rmb_desc->cpu_addr, sizeof(cdc));
+	data_cdc = (struct smcd_cdc_msg *)conn->rmb_desc->cpu_addr;
+	smcd_curs_copy(&cdc.prod, &data_cdc->prod, conn);
+	smcd_curs_copy(&cdc.cons, &data_cdc->cons, conn);
 	smc = container_of(conn, struct smc_sock, conn);
 	smc_cdc_msg_recv(smc, (struct smc_cdc_msg *)&cdc);
 }
--- a/net/smc/smc_cdc.h
+++ b/net/smc/smc_cdc.h
@@ -47,21 +47,31 @@ struct smc_cdc_msg {
 	struct smc_cdc_producer_flags	prod_flags;
 	struct smc_cdc_conn_state_flags	conn_state_flags;
 	u8				reserved[18];
-} __packed;					/* format defined in RFC7609 */
+};
+
+/* SMC-D cursor format */
+union smcd_cdc_cursor {
+	struct {
+		u16	wrap;
+		u32	count;
+		struct smc_cdc_producer_flags	prod_flags;
+		struct smc_cdc_conn_state_flags	conn_state_flags;
+	} __packed;
+#ifdef KERNEL_HAS_ATOMIC64
+	atomic64_t		acurs;		/* for atomic processing */
+#else
+	u64			acurs;		/* for atomic processing */
+#endif
+} __aligned(8);
 
 /* CDC message for SMC-D */
 struct smcd_cdc_msg {
 	struct smc_wr_rx_hdr common;	/* Type = 0xFE */
 	u8 res1[7];
-	u16 prod_wrap;
-	u32 prod_count;
-	u8 res2[2];
-	u16 cons_wrap;
-	u32 cons_count;
-	struct smc_cdc_producer_flags	prod_flags;
-	struct smc_cdc_conn_state_flags conn_state_flags;
+	union smcd_cdc_cursor	prod;
+	union smcd_cdc_cursor	cons;
 	u8 res3[8];
-} __packed;
+} __aligned(8);
 
 static inline bool smc_cdc_rxed_any_close(struct smc_connection *conn)
 {
@@ -134,6 +144,21 @@ static inline void smc_curs_copy_net(uni
 #endif
 }
 
+static inline void smcd_curs_copy(union smcd_cdc_cursor *tgt,
+				  union smcd_cdc_cursor *src,
+				  struct smc_connection *conn)
+{
+#ifndef KERNEL_HAS_ATOMIC64
+	unsigned long flags;
+
+	spin_lock_irqsave(&conn->acurs_lock, flags);
+	tgt->acurs = src->acurs;
+	spin_unlock_irqrestore(&conn->acurs_lock, flags);
+#else
+	atomic64_set(&tgt->acurs, atomic64_read(&src->acurs));
+#endif
+}
+
 /* calculate cursor difference between old and new, where old <= new */
 static inline int smc_curs_diff(unsigned int size,
 				union smc_host_cursor *old,
@@ -221,12 +246,17 @@ static inline void smcr_cdc_msg_to_host(
 static inline void smcd_cdc_msg_to_host(struct smc_host_cdc_msg *local,
 					struct smcd_cdc_msg *peer)
 {
-	local->prod.wrap = peer->prod_wrap;
-	local->prod.count = peer->prod_count;
-	local->cons.wrap = peer->cons_wrap;
-	local->cons.count = peer->cons_count;
-	local->prod_flags = peer->prod_flags;
-	local->conn_state_flags = peer->conn_state_flags;
+	union smc_host_cursor temp;
+
+	temp.wrap = peer->prod.wrap;
+	temp.count = peer->prod.count;
+	atomic64_set(&local->prod.acurs, atomic64_read(&temp.acurs));
+
+	temp.wrap = peer->cons.wrap;
+	temp.count = peer->cons.count;
+	atomic64_set(&local->cons.acurs, atomic64_read(&temp.acurs));
+	local->prod_flags = peer->cons.prod_flags;
+	local->conn_state_flags = peer->cons.conn_state_flags;
 }
 
 static inline void smc_cdc_msg_to_host(struct smc_host_cdc_msg *local,