Blob Blame History Raw
From: Julian Wiedmann <jwi@linux.ibm.com>
Date: Fri, 23 Aug 2019 11:48:50 +0200
Subject: s390/qeth: add TX NAPI support for IQD devices
Git-commit: e53edf743d26b39dfd78af43ff97620a4ac13ffc
Patch-mainline: v5.4-rc1
References: jsc#SLE-7795 LTC#179220

Due to their large MTU and potentially low utilization of TX buffers,
IQD devices in particular require fast TX recycling. This makes them
a prime candidate for a TX NAPI path in qeth.

qeth_tx_poll() uses the recently introduced qdio_inspect_queue() helper
to poll the TX queue for completed buffers. To avoid hogging the CPU for
too long, we yield to the stack after completing an entire queue's worth
of buffers.
While IQD is expected to transfer its buffers synchronously (and thus
doesn't support TX interrupts), a timer covers for the odd case where a
TX buffer doesn't complete synchronously. Currently this timer should
only ever fire for
(1) the mcast queue,
(2) the occasional race, where the NAPI poll code observes an update to
    queue->used_buffers while the TX doorbell hasn't been issued yet.

Signed-off-by: Julian Wiedmann <jwi@linux.ibm.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
Acked-by: Petr Tesarik <ptesarik@suse.com>
---
 arch/s390/include/asm/qdio.h      |    1 
 drivers/s390/net/qeth_core.h      |   26 ++++
 drivers/s390/net/qeth_core_main.c |  202 ++++++++++++++++++++++++++++----------
 drivers/s390/net/qeth_ethtool.c   |    2 
 4 files changed, 183 insertions(+), 48 deletions(-)

--- a/arch/s390/include/asm/qdio.h
+++ b/arch/s390/include/asm/qdio.h
@@ -16,6 +16,7 @@
 #define QDIO_MAX_QUEUES_PER_IRQ		4
 #define QDIO_MAX_BUFFERS_PER_Q		128
 #define QDIO_MAX_BUFFERS_MASK		(QDIO_MAX_BUFFERS_PER_Q - 1)
+#define QDIO_BUFNR(num)			((num) & QDIO_MAX_BUFFERS_MASK)
 #define QDIO_MAX_ELEMENTS_PER_BUFFER	16
 #define QDIO_SBAL_SIZE			256
 
--- a/drivers/s390/net/qeth_core.h
+++ b/drivers/s390/net/qeth_core.h
@@ -22,6 +22,7 @@
 #include <linux/hashtable.h>
 #include <linux/ip.h>
 #include <linux/refcount.h>
+#include <linux/timer.h>
 #include <linux/wait.h>
 #include <linux/workqueue.h>
 
@@ -474,6 +475,8 @@ struct qeth_out_q_stats {
 	u64 tso_bytes;
 	u64 packing_mode_switch;
 	u64 stopped;
+	u64 completion_yield;
+	u64 completion_timer;
 
 	/* rtnl_link_stats64 */
 	u64 tx_packets;
@@ -482,6 +485,8 @@ struct qeth_out_q_stats {
 	u64 tx_dropped;
 };
 
+#define QETH_TX_TIMER_USECS		500
+
 struct qeth_qdio_out_q {
 	struct qdio_buffer *qdio_bufs[QDIO_MAX_BUFFERS_PER_Q];
 	struct qeth_qdio_out_buffer *bufs[QDIO_MAX_BUFFERS_PER_Q];
@@ -500,13 +505,34 @@ struct qeth_qdio_out_q {
 	atomic_t used_buffers;
 	/* indicates whether PCI flag must be set (or if one is outstanding) */
 	atomic_t set_pci_flags_count;
+	struct napi_struct napi;
+	struct timer_list timer;
 };
 
+#define qeth_for_each_output_queue(card, q, i)		\
+	for (i = 0; i < card->qdio.no_out_queues &&	\
+		    (q = card->qdio.out_qs[i]); i++)
+
+#define	qeth_napi_to_out_queue(n) container_of(n, struct qeth_qdio_out_q, napi)
+
+static inline void qeth_tx_arm_timer(struct qeth_qdio_out_q *queue)
+{
+	if (timer_pending(&queue->timer))
+		return;
+	mod_timer(&queue->timer, usecs_to_jiffies(QETH_TX_TIMER_USECS) +
+				 jiffies);
+}
+
 static inline bool qeth_out_queue_is_full(struct qeth_qdio_out_q *queue)
 {
 	return atomic_read(&queue->used_buffers) >= QDIO_MAX_BUFFERS_PER_Q;
 }
 
+static inline bool qeth_out_queue_is_empty(struct qeth_qdio_out_q *queue)
+{
+	return atomic_read(&queue->used_buffers) == 0;
+}
+
 struct qeth_qdio_info {
 	atomic_t state;
 	/* input */
--- a/drivers/s390/net/qeth_core_main.c
+++ b/drivers/s390/net/qeth_core_main.c
@@ -2282,6 +2282,14 @@ static struct qeth_qdio_out_q *qeth_allo
 	return q;
 }
 
+static void qeth_tx_completion_timer(struct timer_list *timer)
+{
+	struct qeth_qdio_out_q *queue = from_timer(queue, timer, timer);
+
+	napi_schedule(&queue->napi);
+	QETH_TXQ_STAT_INC(queue, completion_timer);
+}
+
 static int qeth_alloc_qdio_queues(struct qeth_card *card)
 {
 	int i, j;
@@ -2303,17 +2311,22 @@ static int qeth_alloc_qdio_queues(struct
 
 	/* outbound */
 	for (i = 0; i < card->qdio.no_out_queues; ++i) {
-		card->qdio.out_qs[i] = qeth_alloc_output_queue();
-		if (!card->qdio.out_qs[i])
+		struct qeth_qdio_out_q *queue;
+
+		queue = qeth_alloc_output_queue();
+		if (!queue)
 			goto out_freeoutq;
 		QETH_CARD_TEXT_(card, 2, "outq %i", i);
-		QETH_CARD_HEX(card, 2, &card->qdio.out_qs[i], sizeof(void *));
-		card->qdio.out_qs[i]->card = card;
-		card->qdio.out_qs[i]->queue_no = i;
+		QETH_CARD_HEX(card, 2, &queue, sizeof(void *));
+		card->qdio.out_qs[i] = queue;
+		queue->card = card;
+		queue->queue_no = i;
+		timer_setup(&queue->timer, qeth_tx_completion_timer, 0);
+
 		/* give outbound qeth_qdio_buffers their qdio_buffers */
 		for (j = 0; j < QDIO_MAX_BUFFERS_PER_Q; ++j) {
-			WARN_ON(card->qdio.out_qs[i]->bufs[j] != NULL);
-			if (qeth_init_qdio_out_buf(card->qdio.out_qs[i], j))
+			WARN_ON(queue->bufs[j]);
+			if (qeth_init_qdio_out_buf(queue, j))
 				goto out_freeoutqbufs;
 		}
 	}
@@ -3225,6 +3238,7 @@ static int qeth_switch_to_nonpacking_if_
 static void qeth_flush_buffers(struct qeth_qdio_out_q *queue, int index,
 			       int count)
 {
+	struct qeth_card *card = queue->card;
 	struct qeth_qdio_out_buffer *buf;
 	int rc;
 	int i;
@@ -3273,6 +3287,11 @@ static void qeth_flush_buffers(struct qe
 		qdio_flags |= QDIO_FLAG_PCI_OUT;
 	rc = do_QDIO(CARD_DDEV(queue->card), qdio_flags,
 		     queue->queue_no, index, count);
+
+	/* Fake the TX completion interrupt: */
+	if (IS_IQD(card))
+		napi_schedule(&queue->napi);
+
 	if (rc) {
 		/* ignore temporary SIGA errors without busy condition */
 		if (rc == -ENOBUFS)
@@ -3451,48 +3470,12 @@ static void qeth_qdio_output_handler(str
 		int bidx = i % QDIO_MAX_BUFFERS_PER_Q;
 		buffer = queue->bufs[bidx];
 		qeth_handle_send_error(card, buffer, qdio_error);
-
-		if (queue->bufstates &&
-		    (queue->bufstates[bidx].flags &
-		     QDIO_OUTBUF_STATE_FLAG_PENDING) != 0) {
-			WARN_ON_ONCE(card->options.cq != QETH_CQ_ENABLED);
-
-			if (atomic_cmpxchg(&buffer->state,
-					   QETH_QDIO_BUF_PRIMED,
-					   QETH_QDIO_BUF_PENDING) ==
-				QETH_QDIO_BUF_PRIMED) {
-				qeth_notify_skbs(queue, buffer,
-						 TX_NOTIFY_PENDING);
-			}
-			QETH_CARD_TEXT_(queue->card, 5, "pel%d", bidx);
-
-			/* prepare the queue slot for re-use: */
-			qeth_scrub_qdio_buffer(buffer->buffer,
-					       queue->max_elements);
-			if (qeth_init_qdio_out_buf(queue, bidx)) {
-				QETH_CARD_TEXT(card, 2, "outofbuf");
-				qeth_schedule_recovery(card);
-			}
-		} else {
-			if (card->options.cq == QETH_CQ_ENABLED) {
-				enum iucv_tx_notify n;
-
-				n = qeth_compute_cq_notification(
-					buffer->buffer->element[15].sflags, 0);
-				qeth_notify_skbs(queue, buffer, n);
-			}
-
-			qeth_clear_output_buffer(queue, buffer, qdio_error);
-		}
-		qeth_cleanup_handled_pending(queue, bidx, 0);
+		qeth_clear_output_buffer(queue, buffer, qdio_error);
 	}
+
 	atomic_sub(count, &queue->used_buffers);
-	/* check if we need to do something on this outbound queue */
-	if (!IS_IQD(card))
-		qeth_check_outbound_queue(queue);
+	qeth_check_outbound_queue(queue);
 
-	if (IS_IQD(card))
-		__queue = qeth_iqd_translate_txq(dev, __queue);
 	txq = netdev_get_tx_queue(dev, __queue);
 	/* xmit may have observed the full-condition, but not yet stopped the
 	 * txq. In which case the code below won't trigger. So before returning,
@@ -4757,7 +4740,7 @@ static int qeth_qdio_establish(struct qe
 	init_data.input_sbal_addr_array  = in_sbal_ptrs;
 	init_data.output_sbal_addr_array = out_sbal_ptrs;
 	init_data.output_sbal_state_array = card->qdio.out_bufstates;
-	init_data.scan_threshold	 = IS_IQD(card) ? 1 : 32;
+	init_data.scan_threshold	 = IS_IQD(card) ? 0 : 32;
 
 	if (atomic_cmpxchg(&card->qdio.state, QETH_QDIO_ALLOCATED,
 		QETH_QDIO_ESTABLISHED) == QETH_QDIO_ALLOCATED) {
@@ -5171,6 +5154,99 @@ out:
 }
 EXPORT_SYMBOL_GPL(qeth_poll);
 
+static void qeth_iqd_tx_complete(struct qeth_qdio_out_q *queue,
+				 unsigned int bidx, bool error)
+{
+	struct qeth_qdio_out_buffer *buffer = queue->bufs[bidx];
+	u8 sflags = buffer->buffer->element[15].sflags;
+	struct qeth_card *card = queue->card;
+
+	if (queue->bufstates && (queue->bufstates[bidx].flags &
+				 QDIO_OUTBUF_STATE_FLAG_PENDING)) {
+		WARN_ON_ONCE(card->options.cq != QETH_CQ_ENABLED);
+
+		if (atomic_cmpxchg(&buffer->state, QETH_QDIO_BUF_PRIMED,
+						   QETH_QDIO_BUF_PENDING) ==
+		    QETH_QDIO_BUF_PRIMED)
+			qeth_notify_skbs(queue, buffer, TX_NOTIFY_PENDING);
+
+		QETH_CARD_TEXT_(card, 5, "pel%u", bidx);
+
+		/* prepare the queue slot for re-use: */
+		qeth_scrub_qdio_buffer(buffer->buffer, queue->max_elements);
+		if (qeth_init_qdio_out_buf(queue, bidx)) {
+			QETH_CARD_TEXT(card, 2, "outofbuf");
+			qeth_schedule_recovery(card);
+		}
+
+		return;
+	}
+
+	if (card->options.cq == QETH_CQ_ENABLED)
+		qeth_notify_skbs(queue, buffer,
+				 qeth_compute_cq_notification(sflags, 0));
+	qeth_clear_output_buffer(queue, buffer, error);
+}
+
+static int qeth_tx_poll(struct napi_struct *napi, int budget)
+{
+	struct qeth_qdio_out_q *queue = qeth_napi_to_out_queue(napi);
+	unsigned int queue_no = queue->queue_no;
+	struct qeth_card *card = queue->card;
+	struct net_device *dev = card->dev;
+	unsigned int work_done = 0;
+	struct netdev_queue *txq;
+
+	txq = netdev_get_tx_queue(dev, qeth_iqd_translate_txq(dev, queue_no));
+
+	while (1) {
+		unsigned int start, error, i;
+		int completed;
+
+		if (qeth_out_queue_is_empty(queue)) {
+			napi_complete(napi);
+			return 0;
+		}
+
+		/* Give the CPU a breather: */
+		if (work_done >= QDIO_MAX_BUFFERS_PER_Q) {
+			QETH_TXQ_STAT_INC(queue, completion_yield);
+			if (napi_complete_done(napi, 0))
+				napi_schedule(napi);
+			return 0;
+		}
+
+		completed = qdio_inspect_queue(CARD_DDEV(card), queue_no, false,
+					       &start, &error);
+		if (completed <= 0) {
+			/* Ensure we see TX completion for pending work: */
+			if (napi_complete_done(napi, 0))
+				qeth_tx_arm_timer(queue);
+			return 0;
+		}
+
+		for (i = start; i < start + completed; i++) {
+			unsigned int bidx = QDIO_BUFNR(i);
+
+			qeth_handle_send_error(card, queue->bufs[bidx], error);
+			qeth_iqd_tx_complete(queue, bidx, error);
+			qeth_cleanup_handled_pending(queue, bidx, false);
+		}
+
+		atomic_sub(completed, &queue->used_buffers);
+		work_done += completed;
+
+		/* xmit may have observed the full-condition, but not yet
+		 * stopped the txq. In which case the code below won't trigger.
+		 * So before returning, xmit will re-check the txq's fill level
+		 * and wake it up if needed.
+		 */
+		if (netif_tx_queue_stopped(txq) &&
+		    !qeth_out_queue_is_full(queue))
+			netif_tx_wake_queue(txq);
+	}
+}
+
 static int qeth_setassparms_inspect_rc(struct qeth_ipa_cmd *cmd)
 {
 	if (!cmd->hdr.return_code)
@@ -6117,6 +6193,17 @@ int qeth_open(struct net_device *dev)
 	napi_enable(&card->napi);
 	local_bh_disable();
 	napi_schedule(&card->napi);
+	if (IS_IQD(card)) {
+		struct qeth_qdio_out_q *queue;
+		unsigned int i;
+
+		qeth_for_each_output_queue(card, queue, i) {
+			netif_tx_napi_add(dev, &queue->napi, qeth_tx_poll,
+					  QETH_NAPI_WEIGHT);
+			napi_enable(&queue->napi);
+			napi_schedule(&queue->napi);
+		}
+	}
 	/* kick-start the NAPI softirq: */
 	local_bh_enable();
 	return 0;
@@ -6128,7 +6215,26 @@ int qeth_stop(struct net_device *dev)
 	struct qeth_card *card = dev->ml_priv;
 
 	QETH_CARD_TEXT(card, 4, "qethstop");
-	netif_tx_disable(dev);
+	if (IS_IQD(card)) {
+		struct qeth_qdio_out_q *queue;
+		unsigned int i;
+
+		/* Quiesce the NAPI instances: */
+		qeth_for_each_output_queue(card, queue, i) {
+			napi_disable(&queue->napi);
+			del_timer_sync(&queue->timer);
+		}
+
+		/* Stop .ndo_start_xmit, might still access queue->napi. */
+		netif_tx_disable(dev);
+
+		/* Queues may get re-allocated, so remove the NAPIs here. */
+		qeth_for_each_output_queue(card, queue, i)
+			netif_napi_del(&queue->napi);
+	} else {
+		netif_tx_disable(dev);
+	}
+
 	napi_disable(&card->napi);
 	return 0;
 }
--- a/drivers/s390/net/qeth_ethtool.c
+++ b/drivers/s390/net/qeth_ethtool.c
@@ -39,6 +39,8 @@ static const struct qeth_stats txq_stats
 	QETH_TXQ_STAT("TSO bytes", tso_bytes),
 	QETH_TXQ_STAT("Packing mode switches", packing_mode_switch),
 	QETH_TXQ_STAT("Queue stopped", stopped),
+	QETH_TXQ_STAT("Completion yield", completion_yield),
+	QETH_TXQ_STAT("Completion timer", completion_timer),
 };
 
 static const struct qeth_stats card_stats[] = {