Blob Blame History Raw
From: Magnus Karlsson <magnus.karlsson@intel.com>
Date: Thu, 19 Dec 2019 13:39:30 +0100
Subject: xsk: Add function naming comments and reorder functions
Patch-mainline: v5.6-rc1
Git-commit: 15d8c9162ced1789d25261859a7b37db8426e409
References: jsc#SLE-13706

Add comments on how the ring access functions are named and how they
are supposed to be used for producers and consumers. The functions are
also reordered so that the consumer functions are in the beginning and
the producer functions in the end, for easier reference. Put this in a
separate patch as the diff might look a little odd, but no
functionality has changed in this patch.

Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com>
Signed-off-by: Alexei Starovoitov <ast@kernel.org>
Link: https://lore.kernel.org/bpf/1576759171-28550-12-git-send-email-magnus.karlsson@intel.com
Acked-by: Thomas Bogendoerfer <tbogendoerfer@suse.de>
---
 net/xdp/xsk.c       |   10 +
 net/xdp/xsk_queue.h |  293 +++++++++++++++++++++++++++-------------------------
 2 files changed, 167 insertions(+), 136 deletions(-)

--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -319,6 +319,11 @@ bool xsk_umem_consume_tx(struct xdp_umem
 		if (!xskq_cons_peek_desc(xs->tx, desc, umem))
 			continue;
 
+		/* This is the backpreassure mechanism for the Tx path.
+		 * Reserve space in the completion queue and only proceed
+		 * if there is space in it. This avoids having to implement
+		 * any buffering in the Tx path.
+		 */
 		if (xskq_prod_reserve_addr(umem->cq, desc->addr))
 			goto out;
 
@@ -398,6 +403,11 @@ static int xsk_generic_xmit(struct sock
 		addr = desc.addr;
 		buffer = xdp_umem_get_data(xs->umem, addr);
 		err = skb_store_bits(skb, 0, buffer, len);
+		/* This is the backpreassure mechanism for the Tx path.
+		 * Reserve space in the completion queue and only proceed
+		 * if there is space in it. This avoids having to implement
+		 * any buffering in the Tx path.
+		 */
 		if (unlikely(err) || xskq_prod_reserve(xs->umem->cq)) {
 			kfree_skb(skb);
 			goto out;
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -81,60 +81,27 @@ struct xsk_queue {
  * now and again after circling through the ring.
  */
 
-/* Common functions operating for both RXTX and umem queues */
-
-static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
-{
-	return q ? q->invalid_descs : 0;
-}
-
-static inline void __xskq_cons_release(struct xsk_queue *q)
-{
-	smp_mb(); /* D, matches A */
-	WRITE_ONCE(q->ring->consumer, q->cached_cons);
-}
-
-static inline void __xskq_cons_peek(struct xsk_queue *q)
-{
-	/* Refresh the local pointer */
-	q->cached_prod = READ_ONCE(q->ring->producer);
-	smp_rmb(); /* C, matches B */
-}
-
-static inline void xskq_cons_get_entries(struct xsk_queue *q)
-{
-	__xskq_cons_release(q);
-	__xskq_cons_peek(q);
-}
-
-static inline bool xskq_prod_is_full(struct xsk_queue *q)
-{
-	u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
-
-	if (free_entries)
-		return false;
-
-	/* Refresh the local tail pointer */
-	q->cached_cons = READ_ONCE(q->ring->consumer);
-	free_entries = q->nentries - (q->cached_prod - q->cached_cons);
-
-	return !free_entries;
-}
-
-static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
-{
-	u32 entries = q->cached_prod - q->cached_cons;
-
-	if (entries >= cnt)
-		return true;
-
-	__xskq_cons_peek(q);
-	entries = q->cached_prod - q->cached_cons;
-
-	return entries >= cnt;
-}
+/* The operations on the rings are the following:
+ *
+ * producer                           consumer
+ *
+ * RESERVE entries                    PEEK in the ring for entries
+ * WRITE data into the ring           READ data from the ring
+ * SUBMIT entries                     RELEASE entries
+ *
+ * The producer reserves one or more entries in the ring. It can then
+ * fill in these entries and finally submit them so that they can be
+ * seen and read by the consumer.
+ *
+ * The consumer peeks into the ring to see if the producer has written
+ * any new entries. If so, the producer can then read these entries
+ * and when it is done reading them release them back to the producer
+ * so that the producer can use these slots to fill in new entries.
+ *
+ * The function names below reflect these operations.
+ */
 
-/* UMEM queue */
+/* Functions that read and validate content from consumer rings. */
 
 static inline bool xskq_cons_crosses_non_contig_pg(struct xdp_umem *umem,
 						   u64 addr,
@@ -148,16 +115,6 @@ static inline bool xskq_cons_crosses_non
 	return cross_pg && !next_pg_contig;
 }
 
-static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr)
-{
-	if (addr >= q->size) {
-		q->invalid_descs++;
-		return false;
-	}
-
-	return true;
-}
-
 static inline bool xskq_cons_is_valid_unaligned(struct xsk_queue *q,
 						u64 addr,
 						u64 length,
@@ -175,6 +132,16 @@ static inline bool xskq_cons_is_valid_un
 	return true;
 }
 
+static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr)
+{
+	if (addr >= q->size) {
+		q->invalid_descs++;
+		return false;
+	}
+
+	return true;
+}
+
 static inline bool xskq_cons_read_addr(struct xsk_queue *q, u64 *addr,
 				       struct xdp_umem *umem)
 {
@@ -203,75 +170,6 @@ out:
 	return false;
 }
 
-static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
-				       struct xdp_umem *umem)
-{
-	if (q->cached_prod == q->cached_cons)
-		xskq_cons_get_entries(q);
-	return xskq_cons_read_addr(q, addr, umem);
-}
-
-static inline void xskq_cons_release(struct xsk_queue *q)
-{
-	/* To improve performance, only update local state here.
-	 * Do the actual release operation when we get new entries
-	 * from the ring in xskq_cons_get_entries() instead.
-	 */
-	q->cached_cons++;
-}
-
-static inline int xskq_prod_reserve(struct xsk_queue *q)
-{
-	if (xskq_prod_is_full(q))
-		return -ENOSPC;
-
-	/* A, matches D */
-	q->cached_prod++;
-	return 0;
-}
-
-static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
-{
-	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
-
-	if (xskq_prod_is_full(q))
-		return -ENOSPC;
-
-	/* A, matches D */
-	ring->desc[q->cached_prod++ & q->ring_mask] = addr;
-	return 0;
-}
-
-static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
-{
-	/* Order producer and data */
-	smp_wmb(); /* B, matches C */
-
-	WRITE_ONCE(q->ring->producer, idx);
-}
-
-static inline void xskq_prod_submit(struct xsk_queue *q)
-{
-	__xskq_prod_submit(q, q->cached_prod);
-}
-
-static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
-{
-	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
-	u32 idx = q->ring->producer;
-
-	ring->desc[idx++ & q->ring_mask] = addr;
-
-	__xskq_prod_submit(q, idx);
-}
-
-static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
-{
-	__xskq_prod_submit(q, q->ring->producer + nb_entries);
-}
-
-/* Rx/Tx queue */
-
 static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
 					   struct xdp_desc *d,
 					   struct xdp_umem *umem)
@@ -318,6 +216,48 @@ static inline bool xskq_cons_read_desc(s
 	return false;
 }
 
+/* Functions for consumers */
+
+static inline void __xskq_cons_release(struct xsk_queue *q)
+{
+	smp_mb(); /* D, matches A */
+	WRITE_ONCE(q->ring->consumer, q->cached_cons);
+}
+
+static inline void __xskq_cons_peek(struct xsk_queue *q)
+{
+	/* Refresh the local pointer */
+	q->cached_prod = READ_ONCE(q->ring->producer);
+	smp_rmb(); /* C, matches B */
+}
+
+static inline void xskq_cons_get_entries(struct xsk_queue *q)
+{
+	__xskq_cons_release(q);
+	__xskq_cons_peek(q);
+}
+
+static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
+{
+	u32 entries = q->cached_prod - q->cached_cons;
+
+	if (entries >= cnt)
+		return true;
+
+	__xskq_cons_peek(q);
+	entries = q->cached_prod - q->cached_cons;
+
+	return entries >= cnt;
+}
+
+static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
+				       struct xdp_umem *umem)
+{
+	if (q->cached_prod == q->cached_cons)
+		xskq_cons_get_entries(q);
+	return xskq_cons_read_addr(q, addr, umem);
+}
+
 static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
 				       struct xdp_desc *desc,
 				       struct xdp_umem *umem)
@@ -327,6 +267,60 @@ static inline bool xskq_cons_peek_desc(s
 	return xskq_cons_read_desc(q, desc, umem);
 }
 
+static inline void xskq_cons_release(struct xsk_queue *q)
+{
+	/* To improve performance, only update local state here.
+	 * Reflect this to global state when we get new entries
+	 * from the ring in xskq_cons_get_entries().
+	 */
+	q->cached_cons++;
+}
+
+static inline bool xskq_cons_is_full(struct xsk_queue *q)
+{
+	/* No barriers needed since data is not accessed */
+	return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
+		q->nentries;
+}
+
+/* Functions for producers */
+
+static inline bool xskq_prod_is_full(struct xsk_queue *q)
+{
+	u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
+
+	if (free_entries)
+		return false;
+
+	/* Refresh the local tail pointer */
+	q->cached_cons = READ_ONCE(q->ring->consumer);
+	free_entries = q->nentries - (q->cached_prod - q->cached_cons);
+
+	return !free_entries;
+}
+
+static inline int xskq_prod_reserve(struct xsk_queue *q)
+{
+	if (xskq_prod_is_full(q))
+		return -ENOSPC;
+
+	/* A, matches D */
+	q->cached_prod++;
+	return 0;
+}
+
+static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
+{
+	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+
+	if (xskq_prod_is_full(q))
+		return -ENOSPC;
+
+	/* A, matches D */
+	ring->desc[q->cached_prod++ & q->ring_mask] = addr;
+	return 0;
+}
+
 static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
 					 u64 addr, u32 len)
 {
@@ -344,11 +338,31 @@ static inline int xskq_prod_reserve_desc
 	return 0;
 }
 
-static inline bool xskq_cons_is_full(struct xsk_queue *q)
+static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
 {
-	/* No barriers needed since data is not accessed */
-	return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
-		q->nentries;
+	smp_wmb(); /* B, matches C */
+
+	WRITE_ONCE(q->ring->producer, idx);
+}
+
+static inline void xskq_prod_submit(struct xsk_queue *q)
+{
+	__xskq_prod_submit(q, q->cached_prod);
+}
+
+static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
+{
+	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+	u32 idx = q->ring->producer;
+
+	ring->desc[idx++ & q->ring_mask] = addr;
+
+	__xskq_prod_submit(q, idx);
+}
+
+static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
+{
+	__xskq_prod_submit(q, q->ring->producer + nb_entries);
 }
 
 static inline bool xskq_prod_is_empty(struct xsk_queue *q)
@@ -357,6 +371,13 @@ static inline bool xskq_prod_is_empty(st
 	return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
 }
 
+/* For both producers and consumers */
+
+static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
+{
+	return q ? q->invalid_descs : 0;
+}
+
 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
 void xskq_destroy(struct xsk_queue *q_ops);