Blob Blame History Raw
From: Julian Wiedmann <jwi@linux.ibm.com>
Date: Fri, 20 Nov 2020 10:09:38 +0100
Subject: s390/qeth: fix af_iucv notification race
Git-commit: 8908f36d20d8ba610d3a7d110b3049b5853b9bb1
Patch-mainline: v5.10-rc6
References: git-fixes

The two expected notification sequences are
1. TX_NOTIFY_PENDING with a subsequent TX_NOTIFY_DELAYED_*, when
   our TX completion code first observed the pending TX and the QAOB
   then completes at a later time; or
2. TX_NOTIFY_OK, when qeth_qdio_handle_aob() picked up the QAOB
   completion before our TX completion code even noticed that the TX
   was pending.

But as qeth_iqd_tx_complete() and qeth_qdio_handle_aob() can run
concurrently, we may end up with a race that results in a sequence of
TX_NOTIFY_DELAYED_* followed by TX_NOTIFY_PENDING. Which would confuse
the af_iucv code in its tracking of pending transmits.

Rework the notification code, so that qeth_qdio_handle_aob() defers its
notification if the TX completion code is still active.

Fixes: b333293058aa ("qeth: add support for af_iucv HiperSockets transport")
Signed-off-by: Julian Wiedmann <jwi@linux.ibm.com>
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
Acked-by: Petr Tesarik <ptesarik@suse.com>
---
 drivers/s390/net/qeth_core.h      |   15 +++-----
 drivers/s390/net/qeth_core_main.c |   70 +++++++++++++++++++++++++++-----------
 2 files changed, 56 insertions(+), 29 deletions(-)

--- a/drivers/s390/net/qeth_core.h
+++ b/drivers/s390/net/qeth_core.h
@@ -427,16 +427,13 @@ enum qeth_qdio_buffer_states {
 	 * outbound: filled by driver; owned by hardware in order to be sent
 	 */
 	QETH_QDIO_BUF_PRIMED,
-	/*
-	 * inbound: not applicable
-	 * outbound: identified to be pending in TPQ
-	 */
+	/* Discovered by the TX completion code: */
 	QETH_QDIO_BUF_PENDING,
-	/*
-	 * inbound: not applicable
-	 * outbound: found in completion queue
-	 */
-	QETH_QDIO_BUF_IN_CQ,
+	/* Finished by the TX completion code: */
+	QETH_QDIO_BUF_NEED_QAOB,
+	/* Received QAOB notification on CQ: */
+	QETH_QDIO_BUF_QAOB_OK,
+	QETH_QDIO_BUF_QAOB_ERROR,
 	/*
 	 * inbound: not applicable
 	 * outbound: handled via transfer pending / completion queue
--- a/drivers/s390/net/qeth_core_main.c
+++ b/drivers/s390/net/qeth_core_main.c
@@ -488,6 +488,7 @@ static void qeth_cleanup_handled_pending
 static void qeth_qdio_handle_aob(struct qeth_card *card,
 				 unsigned long phys_aob_addr)
 {
+	enum qeth_qdio_buffer_states new_state = QETH_QDIO_BUF_QAOB_OK;
 	struct qaob *aob;
 	struct qeth_qdio_out_buffer *buffer;
 	enum iucv_tx_notify notification;
@@ -499,22 +500,6 @@ static void qeth_qdio_handle_aob(struct
 	buffer = (struct qeth_qdio_out_buffer *) aob->user1;
 	QETH_CARD_TEXT_(card, 5, "%lx", aob->user1);
 
-	if (atomic_cmpxchg(&buffer->state, QETH_QDIO_BUF_PRIMED,
-			   QETH_QDIO_BUF_IN_CQ) == QETH_QDIO_BUF_PRIMED) {
-		notification = TX_NOTIFY_OK;
-	} else {
-		WARN_ON_ONCE(atomic_read(&buffer->state) !=
-							QETH_QDIO_BUF_PENDING);
-		atomic_set(&buffer->state, QETH_QDIO_BUF_IN_CQ);
-		notification = TX_NOTIFY_DELAYED_OK;
-	}
-
-	if (aob->aorc != 0)  {
-		QETH_CARD_TEXT_(card, 2, "aorc%02X", aob->aorc);
-		notification = qeth_compute_cq_notification(aob->aorc, 1);
-	}
-	qeth_notify_skbs(buffer->q, buffer, notification);
-
 	/* Free dangling allocations. The attached skbs are handled by
 	 * qeth_cleanup_handled_pending().
 	 */
@@ -525,7 +510,33 @@ static void qeth_qdio_handle_aob(struct
 			kmem_cache_free(qeth_core_header_cache,
 					(void *) aob->sba[i]);
 	}
-	atomic_set(&buffer->state, QETH_QDIO_BUF_HANDLED_DELAYED);
+
+	if (aob->aorc) {
+		QETH_CARD_TEXT_(card, 2, "aorc%02X", aob->aorc);
+		new_state = QETH_QDIO_BUF_QAOB_ERROR;
+	}
+
+	switch (atomic_xchg(&buffer->state, new_state)) {
+	case QETH_QDIO_BUF_PRIMED:
+		/* Faster than TX completion code. */
+		notification = qeth_compute_cq_notification(aob->aorc, 0);
+		qeth_notify_skbs(buffer->q, buffer, notification);
+		atomic_set(&buffer->state, QETH_QDIO_BUF_HANDLED_DELAYED);
+		break;
+	case QETH_QDIO_BUF_PENDING:
+		/* TX completion code is active and will handle the async
+		 * completion for us.
+		 */
+		break;
+	case QETH_QDIO_BUF_NEED_QAOB:
+		/* TX completion code is already finished. */
+		notification = qeth_compute_cq_notification(aob->aorc, 1);
+		qeth_notify_skbs(buffer->q, buffer, notification);
+		atomic_set(&buffer->state, QETH_QDIO_BUF_HANDLED_DELAYED);
+		break;
+	default:
+		WARN_ON_ONCE(1);
+	}
 
 	qdio_release_aob(aob);
 }
@@ -1206,9 +1217,6 @@ static void qeth_release_skbs(struct qet
 {
 	struct sk_buff *skb;
 
-	/* release may never happen from within CQ tasklet scope */
-	WARN_ON_ONCE(atomic_read(&buf->state) == QETH_QDIO_BUF_IN_CQ);
-
 	if (atomic_read(&buf->state) == QETH_QDIO_BUF_PENDING)
 		qeth_notify_skbs(buf->q, buf, TX_NOTIFY_GENERALERROR);
 
@@ -3657,6 +3665,28 @@ static void qeth_qdio_output_handler(str
 				QETH_QDIO_BUF_PRIMED) {
 				qeth_notify_skbs(queue, buffer,
 						 TX_NOTIFY_PENDING);
+
+				/* Handle race with qeth_qdio_handle_aob(): */
+				switch (atomic_xchg(&buffer->state,
+						    QETH_QDIO_BUF_NEED_QAOB)) {
+				case QETH_QDIO_BUF_PENDING:
+					/* No concurrent QAOB notification. */
+					break;
+				case QETH_QDIO_BUF_QAOB_OK:
+					qeth_notify_skbs(queue, buffer,
+							 TX_NOTIFY_DELAYED_OK);
+					atomic_set(&buffer->state,
+						   QETH_QDIO_BUF_HANDLED_DELAYED);
+					break;
+				case QETH_QDIO_BUF_QAOB_ERROR:
+					qeth_notify_skbs(queue, buffer,
+							 TX_NOTIFY_DELAYED_GENERALERROR);
+					atomic_set(&buffer->state,
+						   QETH_QDIO_BUF_HANDLED_DELAYED);
+					break;
+				default:
+					WARN_ON_ONCE(1);
+				}
 			}
 			QETH_CARD_TEXT_(queue->card, 5, "pel%d", bidx);