Blob Blame History Raw
From: Karsten Graul <kgraul@linux.ibm.com>
Date: Thu, 10 Sep 2020 18:48:29 +0200
Subject: net/smc: use separate work queues for different worker types
Git-commit: 22ef473dbd66a5241b6cedc186abca5a3a4eb922
Patch-mainline: v5.10-rc1
References: jsc#SLE-13763

There are 6 types of workers which exist per smc connection. 3 of them
are used for listen and handshake processing, another 2 are used for
close and abort processing and 1 is the tx worker that moves calls to
sleeping functions into a worker.
To prevent flooding of the system work queue when many connections are
opened or closed at the same time (some pattern uperf implements), move
those workers to one of 3 smc-specific work queues. Two work queues are
module-global and used for handshake and close workers. The third work
queue is defined per link group and used by the tx workers that may
sleep waiting for resources of this link group.
And in smc_llc_enqueue() queue the llc_event_work work to the system
prio work queue because its critical that this work is started fast.

Reviewed-by: Ursula Braun <ubraun@linux.ibm.com>
Signed-off-by: Karsten Graul <kgraul@linux.ibm.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
Acked-by: Petr Tesarik <ptesarik@suse.com>
---
 net/smc/af_smc.c   |   34 ++++++++++++++++++++++++++--------
 net/smc/smc.h      |    3 +++
 net/smc/smc_cdc.c  |    4 ++--
 net/smc/smc_core.c |   13 +++++++++++--
 net/smc/smc_core.h |    1 +
 net/smc/smc_llc.c  |    2 +-
 net/smc/smc_tx.c   |   10 +++++-----
 7 files changed, 49 insertions(+), 18 deletions(-)

--- a/net/smc/af_smc.c
+++ b/net/smc/af_smc.c
@@ -55,6 +55,9 @@ static DEFINE_MUTEX(smc_client_lgr_pendi
 						 * creation on client
 						 */
 
+struct workqueue_struct	*smc_hs_wq;	/* wq for handshake work */
+struct workqueue_struct	*smc_close_wq;	/* wq for close work */
+
 static void smc_tcp_listen_work(struct work_struct *);
 static void smc_connect_work(struct work_struct *);
 
@@ -905,7 +908,7 @@ static int smc_connect(struct socket *so
 	if (smc->use_fallback)
 		goto out;
 	if (flags & O_NONBLOCK) {
-		if (schedule_work(&smc->connect_work))
+		if (queue_work(smc_hs_wq, &smc->connect_work))
 			smc->connect_nonblock = 1;
 		rc = -EINPROGRESS;
 	} else {
@@ -1412,7 +1415,7 @@ static void smc_tcp_listen_work(struct w
 		new_smc->sk.sk_sndbuf = lsmc->sk.sk_sndbuf;
 		new_smc->sk.sk_rcvbuf = lsmc->sk.sk_rcvbuf;
 		sock_hold(&new_smc->sk); /* sock_put in passive closing */
-		if (!schedule_work(&new_smc->smc_listen_work))
+		if (!queue_work(smc_hs_wq, &new_smc->smc_listen_work))
 			sock_put(&new_smc->sk);
 	}
 
@@ -1432,7 +1435,7 @@ static void smc_clcsock_data_ready(struc
 	lsmc->clcsk_data_ready(listen_clcsock);
 	if (lsmc->sk.sk_state == SMC_LISTEN) {
 		sock_hold(&lsmc->sk); /* sock_put in smc_tcp_listen_work() */
-		if (!schedule_work(&lsmc->tcp_listen_work))
+		if (!queue_work(smc_hs_wq, &lsmc->tcp_listen_work))
 			sock_put(&lsmc->sk);
 	}
 }
@@ -1797,8 +1800,8 @@ static int smc_setsockopt(struct socket
 		    sk->sk_state != SMC_LISTEN &&
 		    sk->sk_state != SMC_CLOSED) {
 			if (val)
-				mod_delayed_work(system_wq, &smc->conn.tx_work,
-						 0);
+				mod_delayed_work(smc->conn.lgr->tx_wq,
+						 &smc->conn.tx_work, 0);
 		}
 		break;
 	case TCP_CORK:
@@ -1806,8 +1809,8 @@ static int smc_setsockopt(struct socket
 		    sk->sk_state != SMC_LISTEN &&
 		    sk->sk_state != SMC_CLOSED) {
 			if (!val)
-				mod_delayed_work(system_wq, &smc->conn.tx_work,
-						 0);
+				mod_delayed_work(smc->conn.lgr->tx_wq,
+						 &smc->conn.tx_work, 0);
 		}
 		break;
 	case TCP_DEFER_ACCEPT:
@@ -2088,10 +2091,19 @@ static int __init smc_init(void)
 	if (rc)
 		goto out_pernet_subsys;
 
+	rc = -ENOMEM;
+	smc_hs_wq = alloc_workqueue("smc_hs_wq", 0, 0);
+	if (!smc_hs_wq)
+		goto out_pnet;
+
+	smc_close_wq = alloc_workqueue("smc_close_wq", 0, 0);
+	if (!smc_close_wq)
+		goto out_alloc_hs_wq;
+
 	rc = smc_core_init();
 	if (rc) {
 		pr_err("%s: smc_core_init fails with %d\n", __func__, rc);
-		goto out_pnet;
+		goto out_alloc_wqs;
 	}
 
 	rc = smc_llc_init();
@@ -2143,6 +2155,10 @@ out_proto:
 	proto_unregister(&smc_proto);
 out_core:
 	smc_core_exit();
+out_alloc_wqs:
+	destroy_workqueue(smc_close_wq);
+out_alloc_hs_wq:
+	destroy_workqueue(smc_hs_wq);
 out_pnet:
 	smc_pnet_exit();
 out_pernet_subsys:
@@ -2157,6 +2173,8 @@ static void __exit smc_exit(void)
 	sock_unregister(PF_SMC);
 	smc_core_exit();
 	smc_ib_unregister_client();
+	destroy_workqueue(smc_close_wq);
+	destroy_workqueue(smc_hs_wq);
 	proto_unregister(&smc_proto6);
 	proto_unregister(&smc_proto);
 	smc_pnet_exit();
--- a/net/smc/smc.h
+++ b/net/smc/smc.h
@@ -239,6 +239,9 @@ static inline struct smc_sock *smc_sk(co
 	return (struct smc_sock *)sk;
 }
 
+extern struct workqueue_struct	*smc_hs_wq;	/* wq for handshake work */
+extern struct workqueue_struct	*smc_close_wq;	/* wq for close work */
+
 #define SMC_SYSTEMID_LEN		8
 
 extern u8	local_systemid[SMC_SYSTEMID_LEN]; /* unique system identifier */
--- a/net/smc/smc_cdc.c
+++ b/net/smc/smc_cdc.c
@@ -299,7 +299,7 @@ static void smc_cdc_msg_validate(struct
 		conn->lnk = link;
 		spin_unlock_bh(&conn->send_lock);
 		sock_hold(&smc->sk); /* sock_put in abort_work */
-		if (!schedule_work(&conn->abort_work))
+		if (!queue_work(smc_close_wq, &conn->abort_work))
 			sock_put(&smc->sk);
 	}
 }
@@ -368,7 +368,7 @@ static void smc_cdc_msg_recv_action(stru
 			smc->clcsock->sk->sk_shutdown |= RCV_SHUTDOWN;
 		sock_set_flag(&smc->sk, SOCK_DONE);
 		sock_hold(&smc->sk); /* sock_put in close_work */
-		if (!schedule_work(&conn->close_work))
+		if (!queue_work(smc_close_wq, &conn->close_work))
 			sock_put(&smc->sk);
 	}
 }
--- a/net/smc/smc_core.c
+++ b/net/smc/smc_core.c
@@ -386,6 +386,12 @@ static int smc_lgr_create(struct smc_soc
 		rc = SMC_CLC_DECL_MEM;
 		goto ism_put_vlan;
 	}
+	lgr->tx_wq = alloc_workqueue("smc_tx_wq-%*phN", 0, 0,
+				     SMC_LGR_ID_SIZE, &lgr->id);
+	if (!lgr->tx_wq) {
+		rc = -ENOMEM;
+		goto free_lgr;
+	}
 	lgr->is_smcd = ini->is_smcd;
 	lgr->sync_err = 0;
 	lgr->terminating = 0;
@@ -426,7 +432,7 @@ static int smc_lgr_create(struct smc_soc
 		lnk = &lgr->lnk[link_idx];
 		rc = smcr_link_init(lgr, lnk, link_idx, ini);
 		if (rc)
-			goto free_lgr;
+			goto free_wq;
 		lgr_list = &smc_lgr_list.list;
 		lgr_lock = &smc_lgr_list.lock;
 		atomic_inc(&lgr_cnt);
@@ -437,6 +443,8 @@ static int smc_lgr_create(struct smc_soc
 	spin_unlock_bh(lgr_lock);
 	return 0;
 
+free_wq:
+	destroy_workqueue(lgr->tx_wq);
 free_lgr:
 	kfree(lgr);
 ism_put_vlan:
@@ -506,7 +514,7 @@ static int smc_switch_cursor(struct smc_
 	    smc->sk.sk_state != SMC_CLOSED) {
 		rc = smcr_cdc_msg_send_validation(conn, pend, wr_buf);
 		if (!rc) {
-			schedule_delayed_work(&conn->tx_work, 0);
+			queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work, 0);
 			smc->sk.sk_data_ready(&smc->sk);
 		}
 	} else {
@@ -813,6 +821,7 @@ static void smc_lgr_free(struct smc_link
 	}
 
 	smc_lgr_free_bufs(lgr);
+	destroy_workqueue(lgr->tx_wq);
 	if (lgr->is_smcd) {
 		smc_ism_put_vlan(lgr->smcd, lgr->vlan_id);
 		put_device(&lgr->smcd->dev);
--- a/net/smc/smc_core.h
+++ b/net/smc/smc_core.h
@@ -225,6 +225,7 @@ struct smc_link_group {
 	u8			id[SMC_LGR_ID_SIZE];	/* unique lgr id */
 	struct delayed_work	free_work;	/* delayed freeing of an lgr */
 	struct work_struct	terminate_work;	/* abnormal lgr termination */
+	struct workqueue_struct	*tx_wq;		/* wq for conn. tx workers */
 	u8			sync_err : 1;	/* lgr no longer fits to peer */
 	u8			terminating : 1;/* lgr is terminating */
 	u8			freeing : 1;	/* lgr is being freed */
--- a/net/smc/smc_llc.c
+++ b/net/smc/smc_llc.c
@@ -1691,7 +1691,7 @@ static void smc_llc_enqueue(struct smc_l
 	spin_lock_irqsave(&lgr->llc_event_q_lock, flags);
 	list_add_tail(&qentry->list, &lgr->llc_event_q);
 	spin_unlock_irqrestore(&lgr->llc_event_q_lock, flags);
-	schedule_work(&lgr->llc_event_work);
+	queue_work(system_highpri_wq, &lgr->llc_event_work);
 }
 
 /* copy received msg and add it to the event queue */
--- a/net/smc/smc_tx.c
+++ b/net/smc/smc_tx.c
@@ -228,8 +228,8 @@ int smc_tx_sendmsg(struct smc_sock *smc,
 			/* for a corked socket defer the RDMA writes if there
 			 * is still sufficient sndbuf_space available
 			 */
-			schedule_delayed_work(&conn->tx_work,
-					      SMC_TX_CORK_DELAY);
+			queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
+					   SMC_TX_CORK_DELAY);
 		else
 			smc_tx_sndbuf_nonempty(conn);
 	} /* while (msg_data_left(msg)) */
@@ -499,7 +499,7 @@ static int smcr_tx_sndbuf_nonempty(struc
 			if (conn->killed)
 				return -EPIPE;
 			rc = 0;
-			mod_delayed_work(system_wq, &conn->tx_work,
+			mod_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
 					 SMC_TX_WORK_DELAY);
 		}
 		return rc;
@@ -623,8 +623,8 @@ void smc_tx_consumer_update(struct smc_c
 			return;
 		if ((smc_cdc_get_slot_and_msg_send(conn) < 0) &&
 		    !conn->killed) {
-			schedule_delayed_work(&conn->tx_work,
-					      SMC_TX_WORK_DELAY);
+			queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
+					   SMC_TX_WORK_DELAY);
 			return;
 		}
 	}